1 ! Copyright (C) 2005 Chris Double. All Rights Reserved.
2 ! See http://factorcode.org/license.txt for BSD license.
4 ! Concurrency library for Factor based on Erlang/Termite style
6 USING: kernel generic threads io namespaces errors words arrays
7 math sequences hashtables strings vectors dlists serialize
14 : (dlist-pop?) ( dlist pred dnode -- obj | f )
16 [ dlist-node-data swap call ] 2keep rot [
17 swapd [ (dlist-unlink) ] keep dlist-node-data nip
19 dlist-node-next (dlist-pop?)
25 : dlist-pop? ( pred dlist -- obj | f )
26 #! Return first item in the dlist that when passed to the
27 #! predicate quotation, true is left on the stack. The
28 #! item is removed from the dlist. The 'pred' quotation
29 #! must have stack effect ( obj -- bool ).
30 #! TODO: needs a better name and should be moved to dlists.
31 dup dlist-first swapd (dlist-pop?) ;
33 : (dlist-pred?) ( pred dnode -- bool )
35 [ dlist-node-data swap call ] 2keep rot [
38 dlist-node-next (dlist-pred?)
44 : dlist-pred? ( pred dlist -- obj | f )
45 #! Return true if any item in the dlist that when passed to the
46 #! predicate quotation, true is left on the stack.
47 #! The 'pred' quotation must have stack effect ( obj -- bool ).
48 #! TODO: needs a better name and should be moved to dlists.
49 dlist-first (dlist-pred?) ;
51 TUPLE: mailbox threads data ;
53 : make-mailbox ( -- mailbox )
54 0 <vector> <dlist> <mailbox> ;
56 : mailbox-empty? ( mailbox -- bool )
57 mailbox-data dlist-empty? ;
59 : mailbox-put ( obj mailbox -- )
60 [ mailbox-data dlist-push-end ] keep
61 [ mailbox-threads ] keep 0 <vector> swap set-mailbox-threads
62 [ schedule-thread ] each yield ;
64 : (mailbox-block-unless-pred) ( pred mailbox -- pred2 mailbox2 )
65 dup mailbox-data pick swap dlist-pred? [
67 swap mailbox-threads push stop
69 (mailbox-block-unless-pred)
72 : (mailbox-block-if-empty) ( mailbox -- mailbox2 )
75 swap mailbox-threads push stop
77 (mailbox-block-if-empty)
80 : mailbox-get ( mailbox -- obj )
81 (mailbox-block-if-empty)
82 mailbox-data dlist-pop-front ;
84 : (mailbox-get-all) ( mailbox -- )
88 dup mailbox-data dlist-pop-front , (mailbox-get-all)
91 : mailbox-get-all ( mailbox -- array )
92 (mailbox-block-if-empty)
93 [ (mailbox-get-all) ] { } make ;
95 : while-mailbox-empty ( mailbox quot -- )
97 dup >r swap >r call r> r> while-mailbox-empty
102 : mailbox-get? ( pred mailbox -- obj )
103 (mailbox-block-unless-pred) mailbox-data dlist-pop? ;
105 TUPLE: node hostname port ;
107 : localnode ( -- node )
110 TUPLE: process links pid mailbox ;
111 TUPLE: remote-process node pid ;
113 GENERIC: send ( message process -- )
115 : random-64 ( -- id )
116 #! Generate a random id to use for pids
117 [ "ID" % 64 [ 9 random-int CHAR: 0 + , ] times ] "" make ;
119 : make-process ( -- process )
120 #! Return a process set to run on the local node. A process is
121 #! similar to a thread but can send and receive messages to and
122 #! from other processes. It may also be linked to other processes so
123 #! that it receives a message if that process terminates.
124 [ ] random-64 make-mailbox <process> ;
126 : make-linked-process ( process -- process )
127 #! Return a process set to run on the local node. That process is
128 #! linked to the process on the stack. It will receive a message if
129 #! that process terminates.
130 unit random-64 make-mailbox <process> ;
132 : self ( -- process )
135 : init-main-process ( -- )
136 #! Setup the main process.
137 make-process \ self set-global ;
141 : with-process ( quot process -- )
142 #! Calls the quotation with 'self' set
143 #! to the given process.
149 DEFER: register-process
150 DEFER: unregister-process
152 : (spawn) ( quot -- process )
153 [ in-thread ] make-process [ with-process ] over slip ;
155 : spawn ( quot -- process )
156 [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn) ;
158 TUPLE: linked-exception error ;
160 : while-no-messages ( quot -- )
161 #! Run the quotation in a loop while no messages are in
162 #! the processes mailbox. The quot should have stack effect
164 >r self process-mailbox r> while-mailbox-empty ; inline
166 M: process send ( message process -- )
167 process-mailbox mailbox-put ;
169 : receive ( -- message )
170 self process-mailbox mailbox-get dup linked-exception? [
171 linked-exception-error throw
174 : receive-if ( pred -- message )
175 self process-mailbox mailbox-get? dup linked-exception? [
176 linked-exception-error throw
179 : rethrow-linked ( error -- )
180 #! Rethrow the error to the linked process
181 self process-links [ over <linked-exception> swap send ] each drop ;
183 : (spawn-link) ( quot -- process )
184 [ in-thread ] self make-linked-process [ with-process ] over slip ;
186 : spawn-link ( quot -- process )
187 [ catch [ rethrow-linked ] when* ] curry
188 [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn-link) ;
190 : (recv) ( msg form -- )
191 #! Process a form with the following format:
192 #! [ pred match-quot ]
193 #! 'pred' is a word that has stack effect ( msg -- bool ). It is
194 #! executed with the message on the stack. It should return a
195 #! boolean if it is a message this form should process.
196 #! 'match-quot' is a quotation with stack effect ( msg -- ). It
197 #! will be called with the message on the top of the stack if
198 #! the 'pred' word returned true.
199 [ first execute ] 2keep rot [ second call ] [ 2drop ] if ;
202 #! Get a message from the processes mailbox. Compare it against the
203 #! forms to run a quotation if it matches the given message. 'forms'
204 #! is a list of quotations in the following format:
205 #! [ pred match-quot ]
206 #! 'pred' is a word that has stack effect ( msg -- bool ). It is
207 #! executed with the message on the stack. It should return a
208 #! boolean if it is a message this form should process.
209 #! 'match-quot' is a quotation with stack effect ( msg -- ). It
210 #! will be called with the message on the top of the stack if
211 #! the 'pred' word returned true.
212 #! Each form in the list will be matched against the message,
213 #! even if a prior match succeeded. This means multiple quotations
214 #! may be run against the message.
215 receive swap [ dupd (recv) ] each drop ;
217 MATCH-VARS: ?from ?tag ;
219 : tag-message ( message -- tagged-message )
220 #! Given a message, wrap it with the sending process and a unique tag.
221 >r self random-64 r> 3array ;
223 : send-synchronous ( message process -- reply )
224 #! Sends a message to the process synchronously. The
225 #! message will be wrapped to include the process of the sender
226 #! and a unique tag. After being sent the sending process will
227 #! block for a reply tagged with the same unique tag.
228 >r tag-message dup r> send second _ 2array [ match ] curry receive-if second ;
230 : forever ( quot -- )
231 #! Loops forever executing the quotation.
232 dup >r call r> forever ;
236 : (spawn-server) ( quot -- )
237 #! Receive a message, and run 'quot' on it. If 'quot'
238 #! returns true, start again, otherwise exit loop.
239 #! The quotation should have stack effect ( message -- bool ).
240 "Waiting for message in server: " write self process-pid print
241 receive over call [ (spawn-server) ] when ;
243 : spawn-server ( quot -- process )
244 #! Spawn a server that receives messages, calling the
245 #! quotation on the message. If the quotation returns false
246 #! the spawned process exits. If it returns true, the process
247 #! starts from the beginning again. The quotation should have
248 #! stack effect ( message -- bool ).
251 "Exiting process: " write self process-pid print
254 : spawn-linked-server ( quot -- process )
255 #! Similar to 'spawn-server' but the parent process will be linked
259 "Exiting process: " write self process-pid print
262 : server-cc ( -- cc | process )
263 #! Captures the current continuation and returns the value.
264 #! If that CC is called with a process on the stack it will
265 #! set 'self' for the current process to it. Otherwise it will
266 #! return the value. This allows capturing a continuation in a server,
267 #! and jumping back into it from a spawn and keeping the 'self'
268 #! variable correct. It's a workaround until I can find out how to
269 #! stop 'self' from being clobbered back to its old value.
270 [ ] callcc1 dup process? [ \ self set-global f ] when ;
272 : call-server-cc ( server-cc -- )
273 #! Calls the server continuation passing the current 'self'
274 #! so the server continuation gets its new self updated.
277 : future ( quot -- future )
278 #! Spawn a process to call the quotation and immediately return
279 #! a 'future' on the stack. The future can later be queried with
280 #! ?future. If the quotation has completed the result will be returned.
281 #! If not, the process will block until the quotation completes.
282 #! 'quot' must have stack effect ( -- X ).
283 [ self send ] append spawn ;
285 : ?future ( future -- result )
286 #! Block the process until the future has completed and then place the
287 #! result on the stack. Return the result immediately if the future has completed.
288 process-mailbox mailbox-get ;
290 TUPLE: promise fulfilled? value processes ;
292 C: promise ( -- <promise> )
293 [ 0 <vector> swap set-promise-processes ] keep ;
295 : fulfill ( value promise -- )
296 #! Set the future of the promise to the given value. Threads
297 #! blocking on the promise will then be released.
298 dup promise-fulfilled? [
299 [ set-promise-value ] keep
300 [ t swap set-promise-fulfilled? ] keep
301 [ promise-processes ] keep 0 <vector> swap set-promise-processes
302 [ schedule-thread ] each yield
305 : (maybe-block-promise) ( promise -- promise )
306 #! Block the process if the promise is unfulfilled. This is different from
307 #! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads
308 #! need to be resumed, rather than just one.
309 dup promise-fulfilled? [
311 swap promise-processes push stop
315 : ?promise ( promise -- result )
316 (maybe-block-promise) promise-value ;
318 ! ******************************
319 ! Experimental code below
320 ! ******************************
323 { { ?from ?tag _ } [ ?tag over 2array ?from send (lazy) ] }
326 : lazy ( quot -- lazy )
327 #! Spawn a process that immediately blocks and return it.
328 #! When '?lazy' is called on the returned process, call the quotation
329 #! and return the result. The quotation must have stack effect ( -- X ).
332 { { ?from ?tag _ } [ call ?tag over 2array ?from send (lazy) ] }
336 : ?lazy ( lazy -- result )
337 #! Given a process spawned using 'lazy', evaluate it and return the result.
338 f swap send-synchronous ;
340 ! ******************************
342 ! ******************************
343 MATCH-VARS: ?process ?name ;
347 : process-registry ( table -- )
349 { { register ?name ?process } [ ?process ?name pick set-hash ] }
350 { { unregister ?name } [ ?name over remove-hash ] }
351 { { ?from ?tag { process ?name } } [ ?tag ?name pick hash 2array ?from send ] }
352 } match-cond process-registry ;
354 : register-process ( name process -- )
355 [ register , swap , , ] { } make \ process-registry get send ;
357 : unregister-process ( name -- )
358 [ unregister , , ] { } make \ process-registry get send ;
360 : get-process ( name -- )
361 [ process , , ] { } make \ process-registry get send-synchronous ;
363 [ H{ } clone process-registry ] (spawn) \ process-registry set-global
365 : handle-node-client ( stream -- )
366 [ [ deserialize ] with-serialized ] with-stream first2 get-process send ;
368 : (node-server) ( server -- )
369 dup accept handle-node-client (node-server) ;
371 : node-server ( port -- )
372 <server> (node-server) ;
374 : send-to-node ( msg pid host port -- )
375 <client> [ 2array [ serialize ] with-serialized ] with-stream ;
377 : start-node ( hostname port -- )
378 [ node-server ] in-thread
379 <node> \ localnode set-global ;
381 M: remote-process send ( message process -- )
382 #! Send the message via the inter-node protocol
383 [ remote-process-pid ] keep
385 [ node-hostname ] keep
386 node-port send-to-node ;
388 M: process serialize ( obj -- )
389 localnode swap process-pid <remote-process> serialize ;
393 { { ?from ?tag _ } [ ?tag "ack" 2array ?from send (test-node1) ] }
397 [ (test-node1) ] spawn
398 "test1" swap register-process ;
400 : test-node2 ( hostname port -- )
401 [ <node> "test1" <remote-process> "message" swap send-synchronous . ] spawn 2drop ;