[ "received" ] [
[
- [ drop "received" ] handle-synchronous
+ receive "received" swap reply-synchronous
] "Synchronous test" spawn
"sent" swap send-synchronous
] unit-test
[ "received" ] [
[
- [ drop "received" ] handle-synchronous
+ receive "received" swap reply-synchronous
] "Synchronous test" spawn
[ 100 milliseconds "sent" ] dip send-synchronous-timeout
] unit-test
-[
- [
- 100 milliseconds sleep
- [ drop "received" ] handle-synchronous
- ] "Synchronous test" spawn
- [ 5 milliseconds "sent" ] dip send-synchronous-timeout
+[ [ 100 milliseconds sleep
+ receive "received" swap reply-synchronous ] "Synchronous test" spawn
+ [ 50 milliseconds "sent" ] dip send-synchronous-timeout
] [ wait-timeout? ] must-fail-with
! ] "Bad synchronous send" spawn "t" set
! [ 3 "t" get send-synchronous ] must-fail
-
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.\r
! See http://factorcode.org/license.txt for BSD license.\r
USING: kernel threads concurrency.mailboxes continuations\r
-namespaces assocs accessors summary fry calendar math sequences ;\r
+namespaces assocs accessors summary fry ;\r
IN: concurrency.messaging\r
\r
-TUPLE: envelope data sender tag expiry ;\r
-\r
-<PRIVATE\r
-\r
-: new-envelope ( data class -- envelope )\r
- new swap >>data self >>sender ;\r
-\r
-: <envelope> ( data -- envelope )\r
- dup envelope?\r
- [ envelope new-envelope ] unless ;\r
-\r
-: expired? ( message -- ? )\r
- dup envelope?\r
- [ expiry>>\r
- [ now (time-) 0 < ]\r
- [ f ] if*\r
- ] [ drop f ] if ; inline\r
-\r
-: if-expired ( message quot -- message )\r
- [ dup expired? ] dip\r
- '[ drop _ call( -- message ) ] [ ] if ; inline\r
-\r
-PRIVATE>\r
-\r
GENERIC: send ( message thread -- )\r
\r
-GENERIC: send-timeout ( timeout message thread -- )\r
-\r
: mailbox-of ( thread -- mailbox )\r
dup mailbox>> [ ] [\r
<mailbox> [ >>mailbox drop ] keep\r
] ?if ;\r
\r
M: thread send ( message thread -- )\r
- [ <envelope> ] dip\r
check-registered mailbox-of mailbox-put ;\r
\r
-M: thread send-timeout ( timeout message thread -- )\r
- [ <envelope> swap hence >>expiry ] dip send ; \r
-\r
: my-mailbox ( -- mailbox ) self mailbox-of ;\r
\r
-: (receive) ( -- message )\r
- my-mailbox mailbox-get ?linked\r
- [ (receive) ] if-expired ; \r
-\r
: receive ( -- message )\r
- (receive) data>> ;\r
- \r
-: (receive-timeout) ( timeout -- message )\r
- [ my-mailbox ] dip\r
- [ mailbox-get-timeout ?linked ] keep\r
- '[ _ (receive-timeout) ] if-expired ; inline\r
+ my-mailbox mailbox-get ?linked ;\r
\r
: receive-timeout ( timeout -- message )\r
- (receive-timeout) data>> ;\r
-\r
-: (receive-if) ( pred -- message )\r
- [ my-mailbox ] dip\r
- [ mailbox-get? ?linked ] keep\r
- '[ _ (receive-if) ] if-expired ; inline\r
+ [ my-mailbox ] dip mailbox-get-timeout ?linked ;\r
\r
: receive-if ( pred -- message )\r
- [ data>> ] prepend (receive-if) data>> ; inline\r
-\r
-: (receive-if-timeout) ( timeout pred -- message )\r
- [ my-mailbox ] 2dip\r
- [ mailbox-get-timeout? ?linked ] 2keep\r
- '[ _ _ (receive-if-timeout) ] if-expired ; inline\r
+ [ my-mailbox ] dip mailbox-get? ?linked ; inline\r
\r
: receive-if-timeout ( timeout pred -- message )\r
- [ data>> ] prepend \r
- (receive-if-timeout) data>> ; inline\r
+ [ my-mailbox ] 2dip mailbox-get-timeout? ?linked ; inline\r
\r
: rethrow-linked ( error process supervisor -- )\r
[ <linked-error> ] dip send ;\r
: spawn-linked ( quot name -- thread )\r
my-mailbox spawn-linked-to ;\r
\r
-TUPLE: synchronous < envelope ;\r
+TUPLE: synchronous data sender tag ;\r
\r
: <synchronous> ( data -- sync )\r
- synchronous new-envelope \r
- synchronous counter >>tag ;\r
+ self synchronous counter synchronous boa ;\r
\r
-TUPLE: reply < envelope ;\r
+TUPLE: reply data tag ;\r
\r
: <reply> ( data synchronous -- reply )\r
- [ reply new-envelope ] dip\r
- tag>> >>tag ;\r
+ tag>> \ reply boa ;\r
\r
: synchronous-reply? ( response synchronous -- ? )\r
over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ;\r
cannot-send-synchronous-to-self\r
] [\r
[ <synchronous> dup ] dip send\r
- '[ _ synchronous-reply? ] (receive-if) data>>\r
- ] if ; \r
+ '[ _ synchronous-reply? ] receive-if\r
+ data>>\r
+ ] if ;\r
\r
: send-synchronous-timeout ( timeout message thread -- reply ) \r
dup self eq? [\r
cannot-send-synchronous-to-self\r
] [\r
- [ <synchronous> 2dup ] dip send-timeout\r
- '[ _ synchronous-reply? ] (receive-if-timeout) data>>\r
+ [ <synchronous> dup ] dip send\r
+ '[ _ synchronous-reply? ] receive-if-timeout\r
+ data>>\r
] if ; \r
-\r
-<PRIVATE\r
-\r
+ \r
: reply-synchronous ( message synchronous -- )\r
- dup expired?\r
- [ 2drop ] \r
- [ [ <reply> ] keep sender>> send ] if ;\r
-\r
-PRIVATE>\r
+ [ <reply> ] keep sender>> send ;\r
\r
: handle-synchronous ( quot -- )\r
- (receive) [\r
+ receive [\r
data>> swap call\r
] keep reply-synchronous ; inline\r
\r