]> gitweb.factorcode.org Git - factor.git/blob - basis/concurrency/distributed/distributed.factor
7e59a4ef79c6938e644a32255cb94ff30639dd3e
[factor.git] / basis / concurrency / distributed / distributed.factor
1 ! Copyright (C) 2005 Chris Double. All Rights Reserved.
2 ! Copyright (C) 2018 Alexander Ilin.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors arrays assocs concurrency.messaging
5 continuations destructors fry init io io.encodings.binary
6 io.servers io.sockets io.streams.duplex kernel namespaces
7 sequences serialize threads ;
8 FROM: concurrency.messaging => send ;
9 IN: concurrency.distributed
10
11 <PRIVATE
12
13 : registered-remote-threads ( -- hash )
14    \ registered-remote-threads get-global ;
15
16 : thread-connections ( -- hash )
17     \ thread-connections get-global ;
18
19 PRIVATE>
20
21 : register-remote-thread ( thread name -- )
22     registered-remote-threads set-at ;
23
24 : unregister-remote-thread ( name -- )
25     registered-remote-threads delete-at ;
26
27 : get-remote-thread ( name -- thread )
28     dup registered-remote-threads at [ ] [ threads at ] ?if ;
29
30 SYMBOL: local-node
31
32 : handle-node-client ( -- )
33     deserialize [
34         first2 get-remote-thread send handle-node-client
35     ] [ stop-this-server ] if* ;
36
37 : <node-server> ( addrspec -- threaded-server )
38     binary <threaded-server>
39         swap >>insecure
40         "concurrency.distributed" >>name
41         [ handle-node-client ] >>handler ;
42
43 : start-node ( addrspec -- )
44     <node-server> start-server local-node set-global ;
45
46 TUPLE: remote-thread node id ;
47
48 C: <remote-thread> remote-thread
49
50 TUPLE: connection remote stream local ;
51
52 C: <connection> connection
53
54 : connect ( remote-thread -- )
55     [ node>> dup binary <client> <connection> ]
56     [ thread-connections set-at ] bi ;
57
58 : disconnect ( remote-thread -- )
59     thread-connections delete-at*
60     [ stream>> dispose ] [ drop ] if ;
61
62 : with-connection ( remote-thread quot -- )
63     '[ connect @ ] over [ disconnect ] curry finally ; inline
64
65 : send-remote-message ( message node -- )
66     binary [ serialize ] with-client ;
67
68 : send-to-connection ( message connection -- )
69     stream>> [ serialize flush ] with-stream* ;
70
71 M: remote-thread send ( message thread -- )
72     [ id>> 2array ] [ node>> ] [ thread-connections at ] tri
73     [ nip send-to-connection ] [ send-remote-message ] if* ;
74
75 M: thread (serialize) ( obj -- )
76     id>> [ local-node get insecure>> ] dip <remote-thread> (serialize) ;
77
78 : stop-node ( -- )
79     f local-node get insecure>> send-remote-message ;
80
81 [
82     H{ } clone \ registered-remote-threads set-global
83     H{ } clone \ thread-connections set-global
84 ] "remote-thread-registry" add-startup-hook