\r
: >box ( value box -- )\r
dup occupied>>\r
- [ box-full ] [ t >>occupied (>>value) ] if ;\r
+ [ box-full ] [ t >>occupied (>>value) ] if ; inline\r
\r
ERROR: box-empty box ;\r
\r
dup occupied>> [ box-empty ] unless ; inline\r
\r
: box> ( box -- value )\r
- check-box [ f ] change-value f >>occupied drop ;\r
+ check-box [ f ] change-value f >>occupied drop ; inline\r
\r
: ?box ( box -- value/f ? )\r
- dup occupied>> [ box> t ] [ drop f f ] if ;\r
+ dup occupied>> [ box> t ] [ drop f f ] if ; inline\r
\r
: if-box? ( box quot -- )\r
[ ?box ] dip [ drop ] if ; inline\r
IN: concurrency.conditions\r
\r
: notify-1 ( deque -- )\r
- dup deque-empty? [ drop ] [ pop-back resume-now ] if ;\r
+ dup deque-empty? [ drop ] [ pop-back resume-now ] if ; inline\r
\r
: notify-all ( deque -- )\r
- [ resume-now ] slurp-deque ;\r
+ [ resume-now ] slurp-deque ; inline\r
\r
: queue-timeout ( queue timeout -- alarm )\r
#! Add an alarm which removes the current thread from the\r
ERROR: wait-timeout ;\r
\r
: queue ( queue -- )\r
- [ self ] dip push-front ;\r
+ [ self ] dip push-front ; inline\r
\r
: wait ( queue timeout status -- )\r
over [\r
[ wait-timeout ] [ cancel-alarm ] if\r
] [\r
[ drop queue ] dip suspend drop\r
- ] if ;\r
+ ] if ; inline\r
locals fry ;
IN: concurrency.mailboxes
-TUPLE: mailbox threads data ;
+TUPLE: mailbox { threads dlist } { data dlist } ;
: <mailbox> ( -- mailbox )
mailbox new
<dlist> >>threads
- <dlist> >>data ;
+ <dlist> >>data ; inline
: mailbox-empty? ( mailbox -- bool )
- data>> deque-empty? ;
+ data>> deque-empty? ; inline
-: mailbox-put ( obj mailbox -- )
+GENERIC: mailbox-put ( obj mailbox -- )
+
+M: mailbox mailbox-put
[ data>> push-front ]
[ threads>> notify-all ] bi yield ;
: wait-for-mailbox ( mailbox timeout -- )
- [ threads>> ] dip "mailbox" wait ;
+ [ threads>> ] dip "mailbox" wait ; inline
:: block-unless-pred ( ... mailbox timeout pred: ( ... message -- ... ? ) -- ... )
mailbox data>> pred dlist-any? [
2dup wait-for-mailbox block-if-empty
] [
drop
- ] if ;
+ ] if ; inline recursive
: mailbox-peek ( mailbox -- obj )
data>> peek-back ;
-: mailbox-get-timeout ( mailbox timeout -- obj )
- block-if-empty data>> pop-back ;
+GENERIC# mailbox-get-timeout 1 ( mailbox timeout -- obj )
+
+M: mailbox mailbox-get-timeout block-if-empty data>> pop-back ;
: mailbox-get ( mailbox -- obj )
- f mailbox-get-timeout ;
+ f mailbox-get-timeout ; inline
: mailbox-get-all-timeout ( mailbox timeout -- array )
block-if-empty
-! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.\r
+! Copyright (C) 2005, 2010 Chris Double, Slava Pestov.\r
! See http://factorcode.org/license.txt for BSD license.\r
-USING: kernel threads concurrency.mailboxes continuations\r
-namespaces assocs accessors summary fry ;\r
+USING: kernel kernel.private threads concurrency.mailboxes\r
+continuations namespaces assocs accessors summary fry ;\r
IN: concurrency.messaging\r
\r
GENERIC: send ( message thread -- )\r
\r
-: mailbox-of ( thread -- mailbox )\r
- dup mailbox>> [ ] [\r
- <mailbox> [ >>mailbox drop ] keep\r
- ] ?if ;\r
+GENERIC: mailbox-of ( thread -- mailbox )\r
+\r
+M: thread mailbox-of\r
+ dup mailbox>>\r
+ [ { mailbox } declare ]\r
+ [ <mailbox> [ >>mailbox drop ] keep ] ?if ; inline\r
\r
M: thread send ( message thread -- )\r
- check-registered mailbox-of mailbox-put ;\r
+ mailbox-of mailbox-put ;\r
\r
-: my-mailbox ( -- mailbox ) self mailbox-of ;\r
+: my-mailbox ( -- mailbox ) self mailbox-of ; inline\r
\r
: receive ( -- message )\r
my-mailbox mailbox-get ?linked ;\r
: <hashed-dlist> ( -- search-deque )
20 <hashtable> <dlist> <search-deque> ;
-M: dlist deque-empty? front>> not ;
+M: dlist deque-empty? front>> not ; inline
M: dlist-node node-value obj>> ;
: <max-heap> ( -- max-heap ) max-heap <heap> ;
M: heap heap-empty? ( heap -- ? )
- data>> empty? ;
+ data>> empty? ; inline
M: heap heap-size ( heap -- n )
data>> length ;
: thread-registered? ( thread -- ? )
id>> threads key? ;
-ERROR: already-stopped thread ;
-
-: check-unregistered ( thread -- thread )
- dup thread-registered? [ already-stopped ] when ;
-
-ERROR: not-running thread ;
-
-: check-registered ( thread -- thread )
- dup thread-registered? [ not-running ] unless ;
-
<PRIVATE
: register-thread ( thread -- )
- check-unregistered dup id>> threads set-at ;
+ dup id>> threads set-at ;
: unregister-thread ( thread -- )
- check-registered id>> threads delete-at ;
+ id>> threads delete-at ;
: set-self ( thread -- ) 63 set-special-object ; inline
65 special-object { dlist } declare ; inline
: sleep-queue ( -- heap )
- 66 special-object { dlist } declare ; inline
+ 66 special-object { min-heap } declare ; inline
: new-thread ( quot name class -- thread )
new
\ thread new-thread ;
: resume ( thread -- )
- f >>state
- check-registered run-queue push-front ;
+ f >>state run-queue push-front ;
: resume-now ( thread -- )
- f >>state
- check-registered run-queue push-back ;
+ f >>state run-queue push-back ;
: resume-with ( obj thread -- )
- f >>state
- check-registered 2array run-queue push-front ;
+ f >>state 2array run-queue push-front ;
: sleep-time ( -- nanos/f )
{
<PRIVATE
: schedule-sleep ( thread dt -- )
- [ check-registered dup ] dip sleep-queue heap-push*
- >>sleep-entry drop ;
+ dupd sleep-queue heap-push* >>sleep-entry drop ;
-: expire-sleep? ( heap -- ? )
- dup heap-empty?
+: expire-sleep? ( -- ? )
+ sleep-queue dup heap-empty?
[ drop f ] [ heap-peek nip nano-count <= ] if ;
: expire-sleep ( thread -- )
f >>sleep-entry resume ;
: expire-sleep-loop ( -- )
- sleep-queue
- [ dup expire-sleep? ]
- [ dup heap-pop drop expire-sleep ]
- while
- drop ;
+ [ expire-sleep? ]
+ [ sleep-queue heap-pop drop expire-sleep ]
+ while ;
CONSTANT: [start]
[
: no-runnable-threads ( -- ) die ;
-: (next) ( obj thread -- obj' )
+GENERIC: (next) ( obj thread -- obj' )
+
+M: thread (next)
dup runnable>>
[ context>> box> set-context ]
[ t >>runnable drop [start] start-context ] if ;