!
! Concurrency library for Factor based on Erlang/Termite style
! concurrency.
-USING: kernel generic threads io namespaces errors words
- math sequences hashtables strings vectors dlists ;
+USING: kernel generic threads io namespaces errors words arrays
+ math sequences hashtables strings vectors dlists serialize ;
IN: concurrency
#! Debug
#! Processes run in nodes. Each process has a mailbox that is
#! used for receiving messages sent to that process.
-TUPLE: process node links pid mailbox ;
+TUPLE: process links pid mailbox ;
+TUPLE: remote-process node pid ;
+
+GENERIC: send ( message process -- )
-: local-process? ( process -- boolean )
- #! Is the process running on the local node
- process-node [ localnode = ] [ t ] if* ;
-
: make-process ( -- process )
#! Return a process set to run on the local node. A process is
#! similar to a thread but can send and receive messages to and
#! from other processes. It may also be linked to other processes so
#! that it receives a message if that process terminates.
- localnode [ ] gensym unparse make-mailbox <process> ;
+ [ ] gensym unparse make-mailbox <process> ;
: make-linked-process ( process -- process )
#! Return a process set to run on the local node. That process is
#! linked to the process on the stack. It will receive a message if
#! that process terminates.
- localnode swap unit gensym unparse make-mailbox <process> ;
+ unit gensym unparse make-mailbox <process> ;
: self ( -- process )
#! Returns the contents of the 'self-process' variables which
] make-hash
swap bind ;
-: spawn ( quot -- process )
+DEFER: register-process
+DEFER: unregister-process
+
+: (spawn) ( quot -- process )
#! Start a process which runs the given quotation.
[ in-thread ] make-process [ with-process ] over slip ;
+: spawn ( quot -- process )
+ #! Start a process which runs the given quotation.
+ [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn) ;
+
TUPLE: linked-exception error ;
: while-no-messages ( quot -- )
#! ( -- ).
>r self process-mailbox r> while-mailbox-empty ; inline
-: remote-send ( message process -- )
+M: remote-process send ( message process -- )
#! Send the message via the inter-node protocol
"remote-send not implemented" throw ;
-: send ( message process -- )
+M: process send ( message process -- )
#! Send the message to the process by placing it in the
#! processes mailbox.
- dup local-process? [ process-mailbox mailbox-put ] [ remote-send ] if ;
+ process-mailbox mailbox-put ;
: receive ( -- message )
#! Return a message from the current processes mailbox.
#! Given a process spawned using 'lazy', evaluate it and return the result.
f swap send-synchronous ;
+! ******************************
+! Standard Processes
+! ******************************
+TUPLE: register-msg name process ;
+TUPLE: unregister-msg name ;
+TUPLE: get-msg name ;
+
+PREDICATE: tagged-message (get-msg) ( obj -- ? )
+ tagged-message-data get-msg? ;
+
+: handle-register-process ( register-msg table -- )
+ >r [ register-msg-process ] keep register-msg-name r> set-hash ;
+
+: handle-unregister-process ( unregister-msg table -- )
+ >r unregister-msg-name r> remove-hash ;
+
+: handle-get-process ( get-msg table -- )
+ over tagged-message-data get-msg-name swap hash reply ;
+
+: process-registry ( table -- )
+ receive {
+ { [ dup register-msg? ] [ over handle-register-process ] }
+ { [ dup unregister-msg? ] [ over handle-unregister-process ] }
+ { [ dup (get-msg)? ] [ over handle-get-process ] }
+ } cond process-registry ;
+
+[ H{ } clone process-registry ] (spawn) \ process-registry set-global
+
+: register-process ( name process -- )
+ <register-msg> \ process-registry get send ;
+
+: unregister-process ( name -- )
+ <unregister-msg> \ process-registry get send ;
+
+: get-process ( name -- )
+ <get-msg> \ process-registry get send-synchronous ;
+
+: handle-node-client ( stream -- )
+ [ [ deserialize ] with-serialized ] with-stream dup . first2 get-process send ;
+
+: (node-server) ( server -- )
+ dup accept handle-node-client (node-server) ;
+
+: node-server ( port -- )
+ <server> (node-server) ;
+
+: send-to-node ( msg pid host port -- )
+ <client> [ 2array [ serialize ] with-serialized ] with-stream ;
\ No newline at end of file