]> gitweb.factorcode.org Git - factor.git/blob - basis/concurrency/distributed/distributed.factor
8f3524697df518b1c88eeb22f3dfd66c418dc272
[factor.git] / basis / concurrency / distributed / distributed.factor
1 ! Copyright (C) 2005 Chris Double. All Rights Reserved.
2 ! Copyright (C) 2018 Alexander Ilin.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors arrays assocs continuations destructors init io
5 io.encodings.binary io.servers io.sockets io.streams.duplex
6 kernel namespaces sequences serialize threads ;
7 FROM: concurrency.messaging => send ;
8 IN: concurrency.distributed
9
10 <PRIVATE
11
12 : registered-remote-threads ( -- hash )
13    \ registered-remote-threads get-global ;
14
15 : thread-connections ( -- hash )
16     \ thread-connections get-global ;
17
18 PRIVATE>
19
20 : register-remote-thread ( thread name -- )
21     registered-remote-threads set-at ;
22
23 : unregister-remote-thread ( name -- )
24     registered-remote-threads delete-at ;
25
26 : get-remote-thread ( name -- thread )
27     dup registered-remote-threads at [ ] [ threads at ] ?if ;
28
29 SYMBOL: local-node
30
31 : handle-node-client ( -- )
32     deserialize [
33         first2 get-remote-thread send handle-node-client
34     ] [ stop-this-server ] if* ;
35
36 : <node-server> ( addrspec -- threaded-server )
37     binary <threaded-server>
38         swap >>insecure
39         "concurrency.distributed" >>name
40         [ handle-node-client ] >>handler ;
41
42 : start-node ( addrspec -- )
43     <node-server> start-server local-node set-global ;
44
45 TUPLE: remote-thread node id ;
46
47 C: <remote-thread> remote-thread
48
49 TUPLE: connection remote stream local ;
50
51 C: <connection> connection
52
53 : connect ( remote-thread -- )
54     [ node>> dup binary <client> <connection> ]
55     [ thread-connections set-at ] bi ;
56
57 : disconnect ( remote-thread -- )
58     thread-connections delete-at*
59     [ stream>> dispose ] [ drop ] if ;
60
61 : with-connection ( remote-thread quot -- )
62     '[ connect @ ] over [ disconnect ] curry finally ; inline
63
64 : send-remote-message ( message node -- )
65     binary [ serialize ] with-client ;
66
67 : send-to-connection ( message connection -- )
68     stream>> [ serialize flush ] with-stream* ;
69
70 M: remote-thread send
71     [ id>> 2array ] [ node>> ] [ thread-connections at ] tri
72     [ nip send-to-connection ] [ send-remote-message ] if* ;
73
74 M: thread (serialize)
75     id>> [ local-node get insecure>> ] dip <remote-thread> (serialize) ;
76
77 : stop-node ( -- )
78     f local-node get insecure>> send-remote-message ;
79
80 [
81     H{ } clone \ registered-remote-threads set-global
82     H{ } clone \ thread-connections set-global
83 ] "remote-thread-registry" add-startup-hook