]> gitweb.factorcode.org Git - factor.git/blob - libs/concurrency/concurrency.factor
more sql changes
[factor.git] / libs / concurrency / concurrency.factor
1 ! Copyright (C) 2005 Chris Double. All Rights Reserved.
2 ! See http://factorcode.org/license.txt for BSD license.
3 !
4 ! Concurrency library for Factor based on Erlang/Termite style
5 ! concurrency.
6 USING: kernel generic threads io namespaces errors words arrays
7        math sequences hashtables strings vectors dlists serialize 
8        match ;
9 IN: concurrency
10
11 #! Debug
12 USE:  prettyprint
13
14 : (dlist-pop?) ( dlist pred dnode -- obj | f )
15   [
16     [ dlist-node-data swap call ] 2keep rot [
17       swapd [ (dlist-unlink) ] keep dlist-node-data nip
18     ] [
19       dlist-node-next (dlist-pop?)
20     ] if
21   ] [
22     2drop f
23   ] if* ;
24
25 : dlist-pop? ( pred dlist -- obj | f )
26   #! Return first item in the dlist that when passed to the
27   #! predicate quotation, true is left on the stack. The
28   #! item is removed from the dlist. The 'pred' quotation
29   #! must have stack effect ( obj -- bool ).
30   #! TODO: needs a better name and should be moved to dlists.
31   dup dlist-first swapd (dlist-pop?) ;  
32
33 : (dlist-pred?) ( pred dnode -- bool )
34   [
35     [ dlist-node-data swap call ] 2keep rot [
36       2drop t
37     ] [
38       dlist-node-next (dlist-pred?)
39     ] if
40   ] [
41     drop f
42   ] if* ;
43
44 : dlist-pred? ( pred dlist -- obj | f )
45   #! Return true if any item in the dlist that when passed to the
46   #! predicate quotation, true is left on the stack. 
47   #! The 'pred' quotation must have stack effect ( obj -- bool ).
48   #! TODO: needs a better name and should be moved to dlists.
49   dlist-first (dlist-pred?) ;  
50
51 TUPLE: mailbox threads data ;
52
53 : make-mailbox ( -- mailbox )
54   0 <vector> <dlist> <mailbox> ;
55
56 : mailbox-empty? ( mailbox -- bool )
57   mailbox-data dlist-empty? ;
58
59 : mailbox-put ( obj mailbox -- )
60   [ mailbox-data dlist-push-end ] keep 
61   [ mailbox-threads ] keep 0 <vector> swap set-mailbox-threads
62   [ schedule-thread ] each yield ;
63
64 : (mailbox-block-unless-pred) ( pred mailbox -- pred2 mailbox2 )  
65   dup mailbox-data pick swap dlist-pred? [
66     [
67       swap mailbox-threads push stop      
68     ] callcc0 
69     (mailbox-block-unless-pred)
70   ] unless ;
71
72 : (mailbox-block-if-empty) ( mailbox -- mailbox2 )  
73   dup mailbox-empty? [
74     [
75       swap mailbox-threads push stop      
76     ] callcc0 
77     (mailbox-block-if-empty)
78   ] when ;
79   
80 : mailbox-get ( mailbox -- obj )
81   (mailbox-block-if-empty)
82   mailbox-data dlist-pop-front ;
83
84 : (mailbox-get-all) ( mailbox -- )
85   dup mailbox-empty? [
86     drop
87   ] [
88     dup mailbox-data dlist-pop-front , (mailbox-get-all)
89   ] if ;
90
91 : mailbox-get-all ( mailbox -- array )
92   (mailbox-block-if-empty)
93   [ (mailbox-get-all) ] { } make ;
94   
95 : while-mailbox-empty ( mailbox quot -- )
96   over mailbox-empty? [
97     dup >r swap >r call r> r> while-mailbox-empty
98   ] [
99     2drop
100   ] if ; inline
101
102 : mailbox-get? ( pred mailbox -- obj )
103   (mailbox-block-unless-pred) mailbox-data dlist-pop? ;
104
105 TUPLE: node hostname port ;
106
107 : localnode ( -- node )
108   \ localnode get ;
109     
110 TUPLE: process links pid mailbox ;
111 TUPLE: remote-process node pid ;
112
113 GENERIC: send ( message process -- )
114
115 : random-64 ( -- id ) 
116     #! Generate a random id to use for pids
117     [ "ID" % 64 [ 9 random-int CHAR: 0 + , ] times ] "" make ;
118
119 : make-process ( -- process )
120   #! Return a process set to run on the local node. A process is 
121   #! similar to a thread but can send and receive messages to and
122   #! from other processes. It may also be linked to other processes so
123   #! that it receives a message if that process terminates.
124   [ ] random-64 make-mailbox <process> ;
125
126 : make-linked-process ( process -- process )
127   #! Return a process set to run on the local node. That process is
128   #! linked to the process on the stack. It will receive a message if
129   #! that process terminates.
130   unit random-64 make-mailbox <process> ;
131
132 : self ( -- process )
133   \ self get  ;
134
135 : init-main-process ( -- )
136   #! Setup the main process.  
137   make-process \ self set-global ;
138
139 init-main-process
140
141 : with-process ( quot process -- )
142   #! Calls the quotation with 'self' set
143   #! to the given process.
144   [
145     \ self set 
146   ] make-hash
147   swap bind ;
148
149 DEFER: register-process
150 DEFER: unregister-process
151
152 : (spawn) ( quot -- process )
153   [ in-thread ] make-process [ with-process ] over slip ;
154
155 : spawn ( quot -- process )
156   [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn) ;
157
158 TUPLE: linked-exception error ;
159
160 : while-no-messages ( quot -- )
161   #! Run the quotation in a loop while no messages are in
162   #! the processes mailbox. The quot should have stack effect
163   #! ( -- ).
164   >r self process-mailbox r> while-mailbox-empty ; inline
165
166 M: process send ( message process -- )
167   process-mailbox mailbox-put ;
168
169 : receive ( -- message )
170   self process-mailbox mailbox-get dup linked-exception? [
171     linked-exception-error throw
172   ] when ;
173
174 : receive-if ( pred -- message )
175   self process-mailbox mailbox-get? dup linked-exception? [
176     linked-exception-error throw
177   ] when ; 
178
179 : rethrow-linked ( error -- )
180   #! Rethrow the error to the linked process
181   self process-links [ over <linked-exception> swap send ] each drop ;
182
183 : (spawn-link) ( quot -- process )
184   [ in-thread ] self make-linked-process [ with-process ] over slip ;
185
186 : spawn-link ( quot -- process )
187   [ catch [ rethrow-linked ] when* ] curry
188   [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn-link) ;
189
190 : (recv) ( msg form -- )
191   #! Process a form with the following format:
192   #!   [ pred match-quot ] 
193   #! 'pred' is a word that has stack effect ( msg -- bool ). It is 
194   #! executed with the message on the stack. It should return a 
195   #! boolean if it is a message this form should process.
196   #! 'match-quot' is a quotation with stack effect ( msg -- ). It
197   #! will be called with the message on the top of the stack if
198   #! the 'pred' word returned true.
199   [ first execute ] 2keep rot [ second call ] [ 2drop ] if ;
200
201 : recv ( forms -- ) 
202   #! Get a message from the processes mailbox. Compare it against the
203   #! forms to run a quotation if it matches the given message. 'forms'
204   #! is a list of quotations in the following format:
205   #!   [ pred match-quot ] 
206   #! 'pred' is a word that has stack effect ( msg -- bool ). It is 
207   #! executed with the message on the stack. It should return a 
208   #! boolean if it is a message this form should process.
209   #! 'match-quot' is a quotation with stack effect ( msg -- ). It
210   #! will be called with the message on the top of the stack if
211   #! the 'pred' word returned true.
212   #! Each form in the list will be matched against the message, 
213   #! even if a prior match succeeded. This means multiple quotations
214   #! may be run against the message.
215   receive swap [ dupd (recv) ] each drop ;
216
217 MATCH-VARS: ?from ?tag ;
218
219 : tag-message ( message -- tagged-message )
220   #! Given a message, wrap it with the sending process and a unique tag.
221   >r self random-64 r> 3array ;
222
223 : send-synchronous ( message process -- reply )
224   #! Sends a message to the process synchronously. The
225   #! message will be wrapped to include the process of the sender 
226   #! and a unique tag. After being sent the sending process will
227   #! block for a reply tagged with the same unique tag.
228   >r tag-message dup r> send second _ 2array [ match ] curry receive-if second ;
229
230 : forever ( quot -- )
231   #! Loops forever executing the quotation.
232   dup >r call r> forever ; 
233
234 SYMBOL: quit-cc
235
236 : (spawn-server) ( quot -- )
237   #! Receive a message, and run 'quot' on it. If 'quot' 
238   #! returns true, start again, otherwise exit loop.
239   #! The quotation should have stack effect ( message -- bool ).
240   "Waiting for message in server: " write self process-pid print
241   receive over call [ (spawn-server) ] when ;
242
243 : spawn-server ( quot -- process )
244   #! Spawn a server that receives messages, calling the
245   #! quotation on the message. If the quotation returns false
246   #! the spawned process exits. If it returns true, the process
247   #! starts from the beginning again. The quotation should have
248   #! stack effect ( message -- bool ).
249   [  
250     (spawn-server)
251     "Exiting process: " write self process-pid print
252   ] curry spawn ;
253
254 : spawn-linked-server ( quot -- process )
255   #! Similar to 'spawn-server' but the parent process will be linked
256   #! to the child.
257   [  
258     (spawn-server)
259     "Exiting process: " write self process-pid print
260   ] curry spawn-link ;
261
262 : server-cc ( -- cc | process )
263   #! Captures the current continuation and returns the value.
264   #! If that CC is called with a process on the stack it will
265   #! set 'self' for the current process to it. Otherwise it will
266   #! return the value. This allows capturing a continuation in a server,
267   #! and jumping back into it from a spawn and keeping the 'self'
268   #! variable correct. It's a workaround until I can find out how to
269   #! stop 'self' from being clobbered back to its old value.
270   [ ] callcc1 dup process? [ \ self set-global f ] when ;
271   
272 : call-server-cc ( server-cc -- )
273   #! Calls the server continuation passing the current 'self'
274   #! so the server continuation gets its new self updated.
275   self swap call ;
276
277 : future ( quot -- future )
278   #! Spawn a process to call the quotation and immediately return
279   #! a 'future' on the stack. The future can later be queried with
280   #! ?future. If the quotation has completed the result will be returned.
281   #! If not, the process will block until the quotation completes.
282   #! 'quot' must have stack effect ( -- X ).
283   [ self send ] append spawn ;
284
285 : ?future ( future -- result )
286   #! Block the process until the future has completed and then place the
287   #! result on the stack. Return the result immediately if the future has completed.
288   process-mailbox mailbox-get ;
289
290 TUPLE: promise fulfilled? value processes ;
291
292 C: promise ( -- <promise> )
293   [ 0 <vector> swap set-promise-processes ] keep ;
294
295 : fulfill ( value promise  -- )
296   #! Set the future of the promise to the given value. Threads
297   #! blocking on the promise will then be released.
298   dup promise-fulfilled? [
299     [ set-promise-value ] keep
300     [ t swap set-promise-fulfilled? ] keep    
301     [ promise-processes ] keep 0 <vector> swap set-promise-processes
302     [ schedule-thread ] each yield 
303   ] unless ;
304
305  : (maybe-block-promise) ( promise -- promise )  
306   #! Block the process if the promise is unfulfilled. This is different from
307   #! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads
308   #! need to be resumed, rather than just one.
309   dup promise-fulfilled? [
310     [
311       swap promise-processes push stop      
312     ] callcc0 
313   ] unless ;
314
315 : ?promise ( promise -- result ) 
316   (maybe-block-promise) promise-value ;
317   
318 ! ******************************
319 ! Experimental code below
320 ! ******************************
321 : (lazy) ( v -- )
322   receive {
323     { { ?from ?tag _ } [ ?tag over 2array ?from send (lazy) ] }
324   } match-cond ;
325
326 : lazy ( quot -- lazy )
327   #! Spawn a process that immediately blocks and return it. 
328   #! When '?lazy' is called on the returned process, call the quotation
329   #! and return the result. The quotation must have stack effect ( -- X ).
330   [ 
331     receive {
332       { { ?from ?tag _ } [ call ?tag over 2array ?from send (lazy) ] }
333     } match-cond 
334   ] spawn nip ;
335
336 : ?lazy ( lazy -- result )
337   #! Given a process spawned using 'lazy', evaluate it and return the result.
338   f swap send-synchronous ;
339
340 ! ******************************
341 ! Standard Processes
342 ! ******************************
343 MATCH-VARS: ?process ?name ;
344 SYMBOL: register
345 SYMBOL: unregister
346
347 : process-registry ( table -- )
348   receive {
349     { { register ?name ?process }  [ ?process ?name pick set-hash ] }
350     { { unregister ?name }         [ ?name over remove-hash ] }
351     { { ?from ?tag { process ?name } } [ ?tag ?name pick hash 2array ?from send  ] }
352   } match-cond process-registry ;
353
354 : register-process ( name process -- )
355   [ register , swap , , ] { } make \ process-registry get send ;
356
357 : unregister-process ( name -- )
358   [ unregister , , ] { } make \ process-registry get send ;
359
360 : get-process ( name -- )
361   [ process , , ] { } make \ process-registry get send-synchronous ;
362
363 [ H{ } clone process-registry ] (spawn) \ process-registry set-global
364
365 : handle-node-client ( stream -- )
366   [ [ deserialize ] with-serialized ] with-stream first2 get-process send ;
367
368 : (node-server) ( server -- )
369   dup accept handle-node-client (node-server) ;
370
371 : node-server ( port -- )
372   <server> (node-server) ;
373
374 : send-to-node ( msg pid  host port -- )
375   <client> [ 2array [ serialize ] with-serialized ] with-stream ;
376
377 : start-node ( hostname port -- )
378   [ node-server ] in-thread
379   <node> \ localnode set-global ;
380
381 M: remote-process send ( message process -- )
382   #! Send the message via the inter-node protocol
383   [ remote-process-pid ] keep 
384   remote-process-node 
385   [ node-hostname ] keep
386   node-port send-to-node ;
387
388 M: process serialize ( obj -- )
389   localnode swap process-pid <remote-process> serialize ;
390
391 : (test-node1) 
392   receive {
393     { { ?from ?tag _ } [ ?tag "ack" 2array ?from send (test-node1) ] }
394   } match-cond ;
395
396 : test-node1 ( -- )
397   [ (test-node1) ] spawn
398   "test1" swap register-process ;
399
400 : test-node2 ( hostname port -- )
401   [ <node> "test1" <remote-process> "message" swap send-synchronous . ] spawn 2drop ;