! Copyright (C) 2005 Chris Double. All Rights Reserved.
! See http://factorcode.org/license.txt for BSD license.
-USING: serialize sequences concurrency.messaging threads io
-io.servers io.encodings.binary assocs init
-arrays namespaces kernel accessors ;
-FROM: io.sockets => host-name <inet> with-client ;
+USING: accessors arrays assocs concurrency.messaging
+continuations destructors fry init io io.encodings.binary
+io.servers io.sockets io.streams.duplex kernel namespaces
+sequences serialize threads ;
+FROM: concurrency.messaging => send ;
IN: concurrency.distributed
<PRIVATE
SYMBOL: local-node
: handle-node-client ( -- )
- deserialize
- [ first2 get-remote-thread send ] [ stop-this-server ] if* ;
+ deserialize [
+ first2 get-remote-thread send handle-node-client
+ ] [ stop-this-server ] if* ;
: <node-server> ( addrspec -- threaded-server )
binary <threaded-server>
: start-node ( addrspec -- )
<node-server> start-server local-node set-global ;
-TUPLE: remote-thread node id ;
+TUPLE: remote-thread node id connection ;
-C: <remote-thread> remote-thread
+: <remote-thread> ( node id -- remote-thread )
+ f remote-thread boa ;
+
+TUPLE: connection remote stream local ;
+
+C: <connection> connection
+
+: connect ( remote-thread -- )
+ dup node>> dup binary <client> <connection> >>connection drop ;
+
+: disconnect ( remote-thread -- )
+ dup connection>> [ stream>> dispose ] when* f >>connection drop ;
+
+: with-connection ( remote-thread quot -- )
+ '[ connect @ ] over [ disconnect ] curry [ ] cleanup ; inline
: send-remote-message ( message node -- )
binary [ serialize ] with-client ;
+: send-to-connection ( message connection -- )
+ stream>> [ serialize flush ] with-stream* ;
+
M: remote-thread send ( message thread -- )
- [ id>> 2array ] [ node>> ] bi
- send-remote-message ;
+ [ id>> 2array ] [ node>> ] [ connection>> ] tri
+ [ nip send-to-connection ] [ send-remote-message ] if* ;
M: thread (serialize) ( obj -- )
id>> [ local-node get insecure>> ] dip <remote-thread> (serialize) ;