]> gitweb.factorcode.org Git - factor.git/commitdiff
reworked messaging, now always use an envelope around the message to handle expiratio...
authorSascha Matzke <sascha.matzke@didolo.org>
Sun, 30 Aug 2009 15:26:23 +0000 (17:26 +0200)
committerSascha Matzke <sascha.matzke@didolo.org>
Sun, 30 Aug 2009 15:26:23 +0000 (17:26 +0200)
basis/concurrency/messaging/messaging-tests.factor
basis/concurrency/messaging/messaging.factor

index 7cbe2b21ffb82de08982c70c81ab23a39849e1db..f3e26f9b5ddf7132896347a765c7728dc8ff7e3b 100644 (file)
@@ -10,21 +10,24 @@ IN: concurrency.messaging.tests
 
 [ "received" ] [ 
     [
-        receive "received" swap reply-synchronous
+        [ drop "received" ] handle-synchronous
     ] "Synchronous test" spawn
     "sent" swap send-synchronous
 ] unit-test
 
 [ "received" ] [ 
     [
-        receive "received" swap reply-synchronous
+        [ drop "received" ] handle-synchronous
     ] "Synchronous test" spawn
     [ 100 milliseconds "sent" ] dip send-synchronous-timeout
 ] unit-test
 
-[ [ 100 milliseconds sleep
-    receive "received" swap reply-synchronous ] "Synchronous test" spawn
-  [ 50 milliseconds "sent" ] dip send-synchronous-timeout
+[
+    [
+        100 milliseconds sleep
+        [ drop "received" ] handle-synchronous
+    ] "Synchronous test" spawn
+    [ 5 milliseconds "sent" ] dip send-synchronous-timeout
 ] [ wait-timeout? ] must-fail-with
 
 
@@ -77,3 +80,4 @@ SYMBOL: exit
 ! ] "Bad synchronous send" spawn "t" set
 
 ! [ 3 "t" get send-synchronous ] must-fail
+
index 8438f7effe6ac75bd83591be7db8d37f4b9d2476..904660428238367e8cfe3f1bdd8fb64549f92927 100644 (file)
@@ -1,32 +1,82 @@
 ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.\r
 ! See http://factorcode.org/license.txt for BSD license.\r
 USING: kernel threads concurrency.mailboxes continuations\r
-namespaces assocs accessors summary fry ;\r
+namespaces assocs accessors summary fry calendar math sequences ;\r
 IN: concurrency.messaging\r
 \r
+TUPLE: envelope data sender tag expiry ;\r
+\r
+<PRIVATE\r
+\r
+: new-envelope ( data class -- envelope )\r
+    new swap >>data self >>sender ;\r
+\r
+: <envelope> ( data -- envelope )\r
+    dup envelope?\r
+    [ envelope new-envelope ] unless ;\r
+\r
+: expired? ( message -- ? )\r
+    dup envelope?\r
+    [ expiry>>\r
+      [ now (time-) 0 < ]\r
+      [ f ] if*\r
+    ] [ drop f ] if ; inline\r
+\r
+: if-expired ( message quot -- message )\r
+    [ dup expired? ] dip\r
+    '[ drop _ call( -- message ) ] [ ] if ; inline\r
+\r
+PRIVATE>\r
+\r
 GENERIC: send ( message thread -- )\r
 \r
+GENERIC: send-timeout ( timeout message thread -- )\r
+\r
 : mailbox-of ( thread -- mailbox )\r
     dup mailbox>> [ ] [\r
         <mailbox> [ >>mailbox drop ] keep\r
     ] ?if ;\r
 \r
 M: thread send ( message thread -- )\r
+    [ <envelope> ] dip\r
     check-registered mailbox-of mailbox-put ;\r
 \r
+M: thread send-timeout ( timeout message thread -- )\r
+    [ <envelope> swap hence >>expiry ] dip send ; \r
+\r
 : my-mailbox ( -- mailbox ) self mailbox-of ;\r
 \r
+: (receive) ( -- message )\r
+    my-mailbox mailbox-get ?linked\r
+    [ (receive) ] if-expired ;  \r
+\r
 : receive ( -- message )\r
-    my-mailbox mailbox-get ?linked ;\r
+    (receive) data>> ;\r
+    \r
+: (receive-timeout) ( timeout -- message )\r
+    [ my-mailbox ] dip\r
+    [ mailbox-get-timeout ?linked ] keep\r
+    '[ _ (receive-timeout) ] if-expired ; inline\r
 \r
 : receive-timeout ( timeout -- message )\r
-    [ my-mailbox ] dip mailbox-get-timeout ?linked ;\r
+    (receive-timeout) data>> ;\r
+\r
+: (receive-if) ( pred -- message )\r
+    [ my-mailbox ] dip\r
+    [ mailbox-get? ?linked ] keep\r
+    '[ _ (receive-if) ] if-expired ; inline\r
 \r
 : receive-if ( pred -- message )\r
-    [ my-mailbox ] dip mailbox-get? ?linked ; inline\r
+    [ data>> ] prepend (receive-if) data>> ; inline\r
+\r
+: (receive-if-timeout) ( timeout pred -- message )\r
+    [ my-mailbox ] 2dip\r
+    [ mailbox-get-timeout? ?linked ] 2keep\r
+    '[ _ _ (receive-if-timeout) ] if-expired ; inline\r
 \r
 : receive-if-timeout ( timeout pred -- message )\r
-    [ my-mailbox ] 2dip mailbox-get-timeout? ?linked ; inline\r
+    [ data>> ] prepend \r
+    (receive-if-timeout) data>> ; inline\r
 \r
 : rethrow-linked ( error process supervisor -- )\r
     [ <linked-error> ] dip send ;\r
@@ -34,15 +84,17 @@ M: thread send ( message thread -- )
 : spawn-linked ( quot name -- thread )\r
     my-mailbox spawn-linked-to ;\r
 \r
-TUPLE: synchronous data sender tag ;\r
+TUPLE: synchronous < envelope ;\r
 \r
 : <synchronous> ( data -- sync )\r
-    self synchronous counter synchronous boa ;\r
+    synchronous new-envelope \r
+    synchronous counter >>tag ;\r
 \r
-TUPLE: reply data tag ;\r
+TUPLE: reply < envelope ;\r
 \r
 : <reply> ( data synchronous -- reply )\r
-    tag>> \ reply boa ;\r
+    [ reply new-envelope ] dip\r
+    tag>> >>tag ;\r
 \r
 : synchronous-reply? ( response synchronous -- ? )\r
     over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ;\r
@@ -57,24 +109,28 @@ M: cannot-send-synchronous-to-self summary
         cannot-send-synchronous-to-self\r
     ] [\r
         [ <synchronous> dup ] dip send\r
-        '[ _ synchronous-reply? ] receive-if\r
-        data>>\r
-    ] if ;\r
+        '[ _ synchronous-reply? ] (receive-if) data>>\r
+    ] if ; \r
 \r
 : send-synchronous-timeout ( timeout message thread -- reply ) \r
     dup self eq? [\r
         cannot-send-synchronous-to-self\r
     ] [\r
-        [ <synchronous> dup ] dip send\r
-        '[ _ synchronous-reply? ] receive-if-timeout\r
-        data>>\r
+        [ <synchronous> 2dup ] dip send-timeout\r
+        '[ _ synchronous-reply? ] (receive-if-timeout) data>>\r
     ] if ;   \r
-    \r
+\r
+<PRIVATE\r
+\r
 : reply-synchronous ( message synchronous -- )\r
-    [ <reply> ] keep sender>> send ;\r
+    dup expired?\r
+    [ 2drop ] \r
+    [ [ <reply> ] keep sender>> send ] if ;\r
+\r
+PRIVATE>\r
 \r
 : handle-synchronous ( quot -- )\r
-    receive [\r
+    (receive) [\r
         data>> swap call\r
     ] keep reply-synchronous ; inline\r
 \r