" to be accessed remotely. " { $link publish } " returns an id which a remote node "
"needs to know to access the channel."
$nl
-{ $snippet "channel [ from . ] spawn drop dup publish" }
+{ $snippet "<channel> dup [ from . flush ] curry \"test\" spawn drop publish" }
$nl
-"Given the id from the snippet above, a remote node can put items in the channel."
+"Given the id from the snippet above, a remote node can put items in the channel (where 123456 is the id):"
$nl
-{ $snippet "\"myhost.com\" 9001 <node> \"ID123456\" <remote-channel>\n\"hello\" over to" }
+{ $snippet "\"myhost.com\" 9001 <node> 123456 <remote-channel>\n\"hello\" over to" }
;
ABOUT: { "remote-channels" "remote-channels" }
! See http://factorcode.org/license.txt for BSD license.
!
! Remote Channels
-USING: kernel init namespaces make assocs arrays random
+USING: kernel init namespaces assocs arrays random
sequences channels match concurrency.messaging
concurrency.distributed threads accessors ;
IN: channels.remote
MATCH-VARS: ?from ?tag ?id ?value ;
SYMBOL: no-channel
+TUPLE: to-message id value ;
+TUPLE: from-message id ;
-: channel-process ( -- )
+: channel-thread ( -- )
[
{
- { { to ?id ?value }
+ { T{ to-message f ?id ?value }
[ ?value ?id get-channel dup [ to f ] [ 2drop no-channel ] if ] }
- { { from ?id }
+ { T{ from-message f ?id }
[ ?id get-channel [ from ] [ no-channel ] if* ] }
} match-cond
] handle-synchronous ;
-PRIVATE>
-
: start-channel-node ( -- )
- "remote-channels" get-process [
- "remote-channels"
- [ channel-process t ] "Remote channels" spawn-server
- register-process
+ "remote-channels" get-remote-thread [
+ [ channel-thread t ] "Remote channels" spawn-server
+ "remote-channels" register-remote-thread
] unless ;
+PRIVATE>
+
TUPLE: remote-channel node id ;
C: <remote-channel> remote-channel
+<PRIVATE
+
+: send-message ( message remote-channel -- value )
+ node>> "remote-channels" <remote-thread>
+ send-synchronous dup no-channel = [ no-channel throw ] when* ;
+
+PRIVATE>
+
M: remote-channel to ( value remote-channel -- )
- [ [ \ to , id>> , , ] { } make ] keep
- node>> "remote-channels" <remote-process>
- send-synchronous no-channel = [ no-channel throw ] when ;
+ [ id>> swap to-message boa ] keep send-message drop ;
M: remote-channel from ( remote-channel -- value )
- [ [ \ from , id>> , ] { } make ] keep
- node>> "remote-channels" <remote-process>
- send-synchronous dup no-channel = [ no-channel throw ] when* ;
+ [ id>> from-message boa ] keep send-message ;
[
H{ } clone \ remote-channels set-global
{ $values { "port" "a port number between 0 and 65535" } }
{ $description "Starts a node server for receiving messages from remote Factor instances." } ;
+ARTICLE: "concurrency.distributed.example" "Distributed Concurrency Example"
+"For a Factor instance to be able to send and receive distributed "
+"concurrency messages it must first have " { $link start-node } " called."
+$nl
+"In one factor instance call " { $link start-node } " with the port 9000, "
+"and in another with the port 9001."
+$nl
+"In this example the Factor instance associated with port 9000 will run "
+"a thread that sits receiving messages and printing the received message "
+"in the listener. The code to start the thread is: "
+{ $examples
+ { $unchecked-example
+ ": log-message ( -- ) receive . flush log-message ;"
+ "[ log-message ] \"logger\" spawn dup name>> register-remote-thread"
+ }
+}
+"This spawns a thread waits for the messages. It registers that thread as a "
+"able to be accessed remotely using " { $link register-remote-thread } "."
+$nl
+"The second Factor instance, the one associated with port 9001, can send "
+"messages to the 'logger' thread by name:"
+{ $examples
+ { $unchecked-example
+ "USING: io.sockets concurrency.messaging concurrency.distributed ;"
+ "\"hello\" \"127.0.0.1\" 9000 <inet4> \"logger\" <remote-thread> send"
+ }
+}
+"The " { $link send } " word is used to send messages to other threads. If an "
+"instance of " { $link remote-thread } " is provided instead of a thread then "
+"the message is marshalled to the named thread on the given machine using the "
+{ $vocab-link "serialize" } " vocabulary."
+$nl
+"Running this code should show the message \"hello\" in the first Factor "
+"instance."
+$nl
+"It is also possible to use " { $link send-synchronous } " to receive a "
+"response to a distributed message. When an instance of " { $link thread } " "
+"is marshalled it is converted into an instance of " { $link remote-thread }
+". The receiver of this can use it as the target of a " { $link send }
+" or " { $link reply } " call." ;
+
ARTICLE: "concurrency.distributed" "Distributed message passing"
"The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite."
{ $subsections start-node }
-"Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:"
-{ $subsections remote-process }
-"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." ;
+"Instances of " { $link thread } " can be sent to remote threads, at which point they are converted to objects holding the thread ID and the current node's host name:"
+{ $subsections remote-thread }
+"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket."
+{ $subsections "concurrency.distributed.example" } ;
+
ABOUT: "concurrency.distributed"
[ ] [
[
receive first2 [ 3 + ] dip send
- "thread-a" unregister-process
+ "thread-a" unregister-remote-thread
] "Thread A" spawn
- "thread-a" swap register-process
+ "thread-a" register-remote-thread
] unit-test
[ 8 ] [
5 self 2array
- "thread-a" test-node <remote-process> send
+ test-node "thread-a" <remote-thread> send
receive
] unit-test
! Copyright (C) 2005 Chris Double. All Rights Reserved.
! See http://factorcode.org/license.txt for BSD license.
USING: serialize sequences concurrency.messaging threads io
-io.servers.connection io.encodings.binary
+io.servers.connection io.encodings.binary assocs init
arrays namespaces kernel accessors ;
FROM: io.sockets => host-name <inet> with-client ;
IN: concurrency.distributed
+<PRIVATE
+
+: registered-remote-threads ( -- hash )
+ \ registered-remote-threads get-global ;
+
+PRIVATE>
+
+: register-remote-thread ( thread name -- )
+ registered-remote-threads set-at ;
+
+: unregister-remote-thread ( name -- )
+ registered-remote-threads delete-at ;
+
+: get-remote-thread ( name -- thread )
+ dup registered-remote-threads at [ ] [ thread ] ?if ;
+
SYMBOL: local-node
: handle-node-client ( -- )
deserialize
- [ first2 get-process send ] [ stop-this-server ] if* ;
+ [ first2 get-remote-thread send ] [ stop-this-server ] if* ;
: <node-server> ( addrspec -- threaded-server )
binary <threaded-server>
: start-node ( port -- )
host-name over <inet> (start-node) ;
-TUPLE: remote-process id node ;
+TUPLE: remote-thread node id ;
-C: <remote-process> remote-process
+C: <remote-thread> remote-thread
: send-remote-message ( message node -- )
binary [ serialize ] with-client ;
-M: remote-process send ( message thread -- )
+M: remote-thread send ( message thread -- )
[ id>> 2array ] [ node>> ] bi
send-remote-message ;
M: thread (serialize) ( obj -- )
- id>> local-node get-global <remote-process>
+ id>> [ local-node get-global ] dip <remote-thread>
(serialize) ;
: stop-node ( node -- )
f swap send-remote-message ;
+
+[
+ H{ } clone \ registered-remote-threads set-global
+] "remote-thread-registry" add-init-hook
+
+
! Copyright (C) 2006 Chris Double.
! See http://factorcode.org/license.txt for BSD license.
-USING: help.syntax help.markup concurrency.messaging.private
+USING: help.syntax help.markup
threads kernel arrays quotations strings ;
IN: concurrency.messaging
receive [\r
data>> swap call\r
] keep reply-synchronous ; inline\r
-\r
-<PRIVATE\r
-\r
-: registered-processes ( -- hash )\r
- \ registered-processes get-global ;\r
-\r
-PRIVATE>\r
-\r
-: register-process ( name process -- )\r
- swap registered-processes set-at ;\r
-\r
-: unregister-process ( name -- )\r
- registered-processes delete-at ;\r
-\r
-: get-process ( name -- process )\r
- dup registered-processes at [ ] [ thread ] ?if ;\r
-\r
-\ registered-processes [ H{ } clone ] initialize\r