]> gitweb.factorcode.org Git - factor.git/commitdiff
concurrency.distributed: implement with-connection combinator
authorAlexander Iljin <ajsoft@yandex.ru>
Wed, 10 Jan 2018 04:21:54 +0000 (07:21 +0300)
committerDoug Coleman <doug.coleman@gmail.com>
Sat, 20 Jan 2018 15:43:40 +0000 (09:43 -0600)
This allows to send multiple messages over the same socket connection. The
old implementation used with-client, which sent a single message and then
closed the socket. The connection stream is stored in the new remote-thread
tuple slot named connection.

basis/concurrency/distributed/distributed.factor

index ec5e66b41cee0b73d63a4458f7926d25320e806a..54449e9db6c48a5d534b61a1f87b97e37a1ea073 100644 (file)
@@ -1,9 +1,10 @@
 ! Copyright (C) 2005 Chris Double. All Rights Reserved.
 ! See http://factorcode.org/license.txt for BSD license.
-USING: serialize sequences concurrency.messaging threads io
-io.servers io.encodings.binary assocs init
-arrays namespaces kernel accessors ;
-FROM: io.sockets => host-name <inet> with-client ;
+USING: accessors arrays assocs concurrency.messaging
+continuations destructors fry init io io.encodings.binary
+io.servers io.sockets io.streams.duplex kernel namespaces
+sequences serialize threads ;
+FROM: concurrency.messaging => send ;
 IN: concurrency.distributed
 
 <PRIVATE
@@ -25,8 +26,9 @@ PRIVATE>
 SYMBOL: local-node
 
 : handle-node-client ( -- )
-    deserialize
-    [ first2 get-remote-thread send ] [ stop-this-server ] if* ;
+    deserialize [
+        first2 get-remote-thread send handle-node-client
+    ] [ stop-this-server ] if* ;
 
 : <node-server> ( addrspec -- threaded-server )
     binary <threaded-server>
@@ -37,16 +39,33 @@ SYMBOL: local-node
 : start-node ( addrspec -- )
     <node-server> start-server local-node set-global ;
 
-TUPLE: remote-thread node id ;
+TUPLE: remote-thread node id connection ;
 
-C: <remote-thread> remote-thread
+: <remote-thread> ( node id -- remote-thread )
+    f remote-thread boa ;
+
+TUPLE: connection remote stream local ;
+
+C: <connection> connection
+
+: connect ( remote-thread -- )
+    dup node>> dup binary <client> <connection> >>connection drop ;
+
+: disconnect ( remote-thread -- )
+    dup connection>> [ stream>> dispose ] when* f >>connection drop ;
+
+: with-connection ( remote-thread quot -- )
+    '[ connect @ ] over [ disconnect ] curry [ ] cleanup ; inline
 
 : send-remote-message ( message node -- )
     binary [ serialize ] with-client ;
 
+: send-to-connection ( message connection -- )
+    stream>> [ serialize flush ] with-stream* ;
+
 M: remote-thread send ( message thread -- )
-    [ id>> 2array ] [ node>> ] bi
-    send-remote-message ;
+    [ id>> 2array ] [ node>> ] [ connection>> ] tri
+    [ nip send-to-connection ] [ send-remote-message ] if* ;
 
 M: thread (serialize) ( obj -- )
     id>> [ local-node get insecure>> ] dip <remote-thread> (serialize) ;