]> gitweb.factorcode.org Git - factor.git/blob - basis/concurrency/messaging/messaging.factor
03d130452717e34eac12206d085027c3e3d5ad8f
[factor.git] / basis / concurrency / messaging / messaging.factor
1 ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.\r
2 ! See http://factorcode.org/license.txt for BSD license.\r
3 !\r
4 ! Concurrency library for Factor, based on Erlang/Termite style\r
5 ! concurrency.\r
6 USING: kernel threads concurrency.mailboxes continuations\r
7 namespaces assocs random accessors summary ;\r
8 IN: concurrency.messaging\r
9 \r
10 GENERIC: send ( message thread -- )\r
11 \r
12 : mailbox-of ( thread -- mailbox )\r
13     dup mailbox>> [ ] [\r
14         <mailbox> [ >>mailbox drop ] keep\r
15     ] ?if ;\r
16 \r
17 M: thread send ( message thread -- )\r
18     check-registered mailbox-of mailbox-put ;\r
19 \r
20 : my-mailbox ( -- mailbox ) self mailbox-of ;\r
21 \r
22 : receive ( -- message )\r
23     my-mailbox mailbox-get ?linked ;\r
24 \r
25 : receive-timeout ( timeout -- message )\r
26     my-mailbox swap mailbox-get-timeout ?linked ;\r
27 \r
28 : receive-if ( pred -- message )\r
29     my-mailbox swap mailbox-get? ?linked ; inline\r
30 \r
31 : receive-if-timeout ( timeout pred -- message )\r
32     my-mailbox -rot mailbox-get-timeout? ?linked ; inline\r
33 \r
34 : rethrow-linked ( error process supervisor -- )\r
35     >r <linked-error> r> send ;\r
36 \r
37 : spawn-linked ( quot name -- thread )\r
38     my-mailbox spawn-linked-to ;\r
39 \r
40 TUPLE: synchronous data sender tag ;\r
41 \r
42 : <synchronous> ( data -- sync )\r
43     self 256 random-bits synchronous boa ;\r
44 \r
45 TUPLE: reply data tag ;\r
46 \r
47 : <reply> ( data synchronous -- reply )\r
48     tag>> \ reply boa ;\r
49 \r
50 : synchronous-reply? ( response synchronous -- ? )\r
51     over reply?\r
52     [ >r tag>> r> tag>> = ]\r
53     [ 2drop f ] if ;\r
54 \r
55 ERROR: cannot-send-synchronous-to-self message thread ;\r
56 \r
57 M: cannot-send-synchronous-to-self summary\r
58     drop "Cannot synchronous send to myself" ;\r
59 \r
60 : send-synchronous ( message thread -- reply )\r
61     dup self eq? [\r
62         cannot-send-synchronous-to-self\r
63     ] [\r
64         >r <synchronous> dup r> send\r
65         [ synchronous-reply? ] curry receive-if\r
66         data>>\r
67     ] if ;\r
68 \r
69 : reply-synchronous ( message synchronous -- )\r
70     [ <reply> ] keep sender>> send ;\r
71 \r
72 : handle-synchronous ( quot -- )\r
73     receive [\r
74         data>> swap call\r
75     ] keep reply-synchronous ; inline\r
76 \r
77 <PRIVATE\r
78 \r
79 : registered-processes ( -- hash )\r
80    \ registered-processes get-global ;\r
81 \r
82 PRIVATE>\r
83 \r
84 : register-process ( name process -- )\r
85     swap registered-processes set-at ;\r
86 \r
87 : unregister-process ( name -- )\r
88     registered-processes delete-at ;\r
89 \r
90 : get-process ( name -- process )\r
91     dup registered-processes at [ ] [ thread ] ?if ;\r
92 \r
93 \ registered-processes global [ H{ } assoc-like ] change-at\r