[ "received" ] [
[
- receive "received" swap reply-synchronous
+ [ drop "received" ] handle-synchronous
] "Synchronous test" spawn
"sent" swap send-synchronous
] unit-test
[ "received" ] [
[
- receive "received" swap reply-synchronous
+ [ drop "received" ] handle-synchronous
] "Synchronous test" spawn
[ 100 milliseconds "sent" ] dip send-synchronous-timeout
] unit-test
-[ [ 100 milliseconds sleep
- receive "received" swap reply-synchronous ] "Synchronous test" spawn
- [ 50 milliseconds "sent" ] dip send-synchronous-timeout
+[
+ [
+ 100 milliseconds sleep
+ [ drop "received" ] handle-synchronous
+ ] "Synchronous test" spawn
+ [ 5 milliseconds "sent" ] dip send-synchronous-timeout
] [ wait-timeout? ] must-fail-with
! ] "Bad synchronous send" spawn "t" set
! [ 3 "t" get send-synchronous ] must-fail
+
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.\r
! See http://factorcode.org/license.txt for BSD license.\r
USING: kernel threads concurrency.mailboxes continuations\r
-namespaces assocs accessors summary fry ;\r
+namespaces assocs accessors summary fry calendar math sequences ;\r
IN: concurrency.messaging\r
\r
+TUPLE: envelope data sender tag expiry ;\r
+\r
+<PRIVATE\r
+\r
+: new-envelope ( data class -- envelope )\r
+ new swap >>data self >>sender ;\r
+\r
+: <envelope> ( data -- envelope )\r
+ dup envelope?\r
+ [ envelope new-envelope ] unless ;\r
+\r
+: expired? ( message -- ? )\r
+ dup envelope?\r
+ [ expiry>>\r
+ [ now (time-) 0 < ]\r
+ [ f ] if*\r
+ ] [ drop f ] if ; inline\r
+\r
+: if-expired ( message quot -- message )\r
+ [ dup expired? ] dip\r
+ '[ drop _ call( -- message ) ] [ ] if ; inline\r
+\r
+PRIVATE>\r
+\r
GENERIC: send ( message thread -- )\r
\r
+GENERIC: send-timeout ( timeout message thread -- )\r
+\r
: mailbox-of ( thread -- mailbox )\r
dup mailbox>> [ ] [\r
<mailbox> [ >>mailbox drop ] keep\r
] ?if ;\r
\r
M: thread send ( message thread -- )\r
+ [ <envelope> ] dip\r
check-registered mailbox-of mailbox-put ;\r
\r
+M: thread send-timeout ( timeout message thread -- )\r
+ [ <envelope> swap hence >>expiry ] dip send ; \r
+\r
: my-mailbox ( -- mailbox ) self mailbox-of ;\r
\r
+: (receive) ( -- message )\r
+ my-mailbox mailbox-get ?linked\r
+ [ (receive) ] if-expired ; \r
+\r
: receive ( -- message )\r
- my-mailbox mailbox-get ?linked ;\r
+ (receive) data>> ;\r
+ \r
+: (receive-timeout) ( timeout -- message )\r
+ [ my-mailbox ] dip\r
+ [ mailbox-get-timeout ?linked ] keep\r
+ '[ _ (receive-timeout) ] if-expired ; inline\r
\r
: receive-timeout ( timeout -- message )\r
- [ my-mailbox ] dip mailbox-get-timeout ?linked ;\r
+ (receive-timeout) data>> ;\r
+\r
+: (receive-if) ( pred -- message )\r
+ [ my-mailbox ] dip\r
+ [ mailbox-get? ?linked ] keep\r
+ '[ _ (receive-if) ] if-expired ; inline\r
\r
: receive-if ( pred -- message )\r
- [ my-mailbox ] dip mailbox-get? ?linked ; inline\r
+ [ data>> ] prepend (receive-if) data>> ; inline\r
+\r
+: (receive-if-timeout) ( timeout pred -- message )\r
+ [ my-mailbox ] 2dip\r
+ [ mailbox-get-timeout? ?linked ] 2keep\r
+ '[ _ _ (receive-if-timeout) ] if-expired ; inline\r
\r
: receive-if-timeout ( timeout pred -- message )\r
- [ my-mailbox ] 2dip mailbox-get-timeout? ?linked ; inline\r
+ [ data>> ] prepend \r
+ (receive-if-timeout) data>> ; inline\r
\r
: rethrow-linked ( error process supervisor -- )\r
[ <linked-error> ] dip send ;\r
: spawn-linked ( quot name -- thread )\r
my-mailbox spawn-linked-to ;\r
\r
-TUPLE: synchronous data sender tag ;\r
+TUPLE: synchronous < envelope ;\r
\r
: <synchronous> ( data -- sync )\r
- self synchronous counter synchronous boa ;\r
+ synchronous new-envelope \r
+ synchronous counter >>tag ;\r
\r
-TUPLE: reply data tag ;\r
+TUPLE: reply < envelope ;\r
\r
: <reply> ( data synchronous -- reply )\r
- tag>> \ reply boa ;\r
+ [ reply new-envelope ] dip\r
+ tag>> >>tag ;\r
\r
: synchronous-reply? ( response synchronous -- ? )\r
over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ;\r
cannot-send-synchronous-to-self\r
] [\r
[ <synchronous> dup ] dip send\r
- '[ _ synchronous-reply? ] receive-if\r
- data>>\r
- ] if ;\r
+ '[ _ synchronous-reply? ] (receive-if) data>>\r
+ ] if ; \r
\r
: send-synchronous-timeout ( timeout message thread -- reply ) \r
dup self eq? [\r
cannot-send-synchronous-to-self\r
] [\r
- [ <synchronous> dup ] dip send\r
- '[ _ synchronous-reply? ] receive-if-timeout\r
- data>>\r
+ [ <synchronous> 2dup ] dip send-timeout\r
+ '[ _ synchronous-reply? ] (receive-if-timeout) data>>\r
] if ; \r
- \r
+\r
+<PRIVATE\r
+\r
: reply-synchronous ( message synchronous -- )\r
- [ <reply> ] keep sender>> send ;\r
+ dup expired?\r
+ [ 2drop ] \r
+ [ [ <reply> ] keep sender>> send ] if ;\r
+\r
+PRIVATE>\r
\r
: handle-synchronous ( quot -- )\r
- receive [\r
+ (receive) [\r
data>> swap call\r
] keep reply-synchronous ; inline\r
\r