]> gitweb.factorcode.org Git - factor.git/commitdiff
more work on distributed concurrency
authorchris.double <chris.double@double.co.nz>
Thu, 31 Aug 2006 03:38:45 +0000 (03:38 +0000)
committerchris.double <chris.double@double.co.nz>
Thu, 31 Aug 2006 03:38:45 +0000 (03:38 +0000)
contrib/concurrency/concurrency.factor
contrib/concurrency/load.factor

index 356752331407c02bd63aea59466fb1f820abe4b0..d602dba32b011bcc4a7e3805a288414a4af292f6 100644 (file)
@@ -23,8 +23,8 @@
 !
 ! Concurrency library for Factor based on Erlang/Termite style
 ! concurrency.
-USING: kernel generic threads io namespaces errors words 
-       math sequences hashtables strings vectors dlists ;
+USING: kernel generic threads io namespaces errors words arrays
+       math sequences hashtables strings vectors dlists serialize ;
 IN: concurrency
 
 #! Debug
@@ -146,24 +146,23 @@ TUPLE: node hostname port ;
      
 #! Processes run in nodes. Each process has a mailbox that is
 #! used for receiving messages sent to that process.
-TUPLE: process node links pid mailbox ;
+TUPLE: process links pid mailbox ;
+TUPLE: remote-process node pid ;
+
+GENERIC: send ( message process -- )
 
-: local-process? ( process -- boolean )
-  #! Is the process running on the local node
-  process-node [ localnode = ] [ t ] if* ;
-  
 : make-process ( -- process )
   #! Return a process set to run on the local node. A process is 
   #! similar to a thread but can send and receive messages to and
   #! from other processes. It may also be linked to other processes so
   #! that it receives a message if that process terminates.
-  localnode [ ] gensym unparse make-mailbox <process> ;
+  [ ] gensym unparse make-mailbox <process> ;
 
 : make-linked-process ( process -- process )
   #! Return a process set to run on the local node. That process is
   #! linked to the process on the stack. It will receive a message if
   #! that process terminates.
-  localnode swap unit gensym unparse make-mailbox <process> ;
+  unit gensym unparse make-mailbox <process> ;
 
 : self ( -- process )
   #! Returns the contents of the 'self-process' variables which
@@ -184,10 +183,17 @@ init-main-process
   ] make-hash
   swap bind ;
 
-: spawn ( quot -- process )
+DEFER: register-process
+DEFER: unregister-process
+
+: (spawn) ( quot -- process )
   #! Start a process which runs the given quotation.
   [ in-thread ] make-process [ with-process ] over slip ;
 
+: spawn ( quot -- process )
+  #! Start a process which runs the given quotation.
+  [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn) ;
+
 TUPLE: linked-exception error ;
 
 : while-no-messages ( quot -- )
@@ -196,14 +202,14 @@ TUPLE: linked-exception error ;
   #! ( -- ).
   >r self process-mailbox r> while-mailbox-empty ; inline
 
-: remote-send ( message process -- )
+M: remote-process send ( message process -- )
   #! Send the message via the inter-node protocol
   "remote-send not implemented" throw ;
 
-: send ( message process -- )
+M: process send ( message process -- )
   #! Send the message to the process by placing it in the
   #! processes mailbox.   
-  dup local-process? [ process-mailbox mailbox-put ] [ remote-send ] if ;
+  process-mailbox mailbox-put ;
 
 : receive ( -- message )
   #! Return a message from the current processes mailbox.
@@ -447,3 +453,51 @@ C: promise ( -- <promise> )
   #! Given a process spawned using 'lazy', evaluate it and return the result.
   f swap send-synchronous ;
 
+! ******************************
+! Standard Processes
+! ******************************
+TUPLE: register-msg name process ;
+TUPLE: unregister-msg name ;
+TUPLE: get-msg name ;
+
+PREDICATE: tagged-message (get-msg) ( obj -- ? )
+  tagged-message-data get-msg? ;
+
+: handle-register-process ( register-msg table -- )
+  >r [ register-msg-process ] keep register-msg-name r> set-hash ;
+
+: handle-unregister-process ( unregister-msg table -- )
+  >r unregister-msg-name r> remove-hash ;
+
+: handle-get-process ( get-msg table -- )
+  over tagged-message-data get-msg-name swap hash reply ;
+
+: process-registry ( table -- )
+  receive {
+    { [ dup register-msg? ] [ over handle-register-process ] }
+    { [ dup unregister-msg? ] [ over handle-unregister-process ] }
+    { [ dup (get-msg)? ] [ over handle-get-process ] }
+  } cond process-registry ;
+
+[ H{ } clone process-registry ] (spawn) \ process-registry set-global
+
+: register-process ( name process -- )
+  <register-msg> \ process-registry get send ;
+
+: unregister-process ( name -- )
+  <unregister-msg> \ process-registry get send ;
+
+: get-process ( name -- )
+  <get-msg> \ process-registry get send-synchronous ;
+
+: handle-node-client ( stream -- )
+  [ [ deserialize ] with-serialized ] with-stream dup . first2 get-process send ;
+
+: (node-server) ( server -- )
+  dup accept handle-node-client (node-server) ;
+
+: node-server ( port -- )
+  <server> (node-server) ;
+
+: send-to-node ( msg pid  host port -- )
+  <client> [ 2array [ serialize ] with-serialized ] with-stream ;
\ No newline at end of file
index c14bc5572e946b36e84799c09b8adec011e72eb9..9f175ee4a2e9321cf06b99b27f5cda39674cb327 100644 (file)
@@ -1,4 +1,4 @@
-REQUIRES: dlists ;
+REQUIRES: dlists serialize ;
 
 PROVIDE: concurrency
 { "concurrency.factor" }