-! Copyright (C) 2004, 2008 Slava Pestov.
+! Copyright (C) 2004, 2011 Slava Pestov.
! Copyright (C) 2005 Mackenzie Straight.
! See http://factorcode.org/license.txt for BSD license.
-USING: arrays hashtables heaps kernel kernel.private math
-namespaces sequences vectors continuations continuations.private
-dlists assocs system combinators init boxes accessors
-math.order deques strings quotations fry ;
+USING: accessors alien.private arrays assocs boxes combinators
+continuations continuations.private deques dlists hashtables
+heaps kernel kernel.private math math.order namespaces
+quotations sequences strings system ;
IN: threads
+<PRIVATE
+PRIMITIVE: (set-context) ( obj context -- obj' )
+PRIMITIVE: (set-context-and-delete) ( obj context -- * )
+PRIMITIVE: (sleep) ( nanos -- )
+PRIMITIVE: (start-context) ( obj quot -- obj' )
+PRIMITIVE: (start-context-and-delete) ( obj quot -- * )
+
+PRIMITIVE: context-object-for ( n context -- obj )
+
+! Wrap sub-primitives; we don't want them inlined into callers
+! since their behavior depends on what frames are on the callstack
+: set-context ( obj context -- obj' )
+ (set-context) ; inline
+
+: start-context ( obj quot: ( obj -- * ) -- obj' )
+ (start-context) ; inline
+
+: set-context-and-delete ( obj context -- * )
+ (set-context-and-delete) ; inline
+
+: start-context-and-delete ( obj quot: ( obj -- * ) -- * )
+ (start-context-and-delete) ; inline
+
+! Context introspection
+: namestack-for ( context -- namestack )
+ [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
+
+: catchstack-for ( context -- catchstack )
+ [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
+
+: continuation-for ( context -- continuation )
+ {
+ [ datastack-for ]
+ [ callstack-for ]
+ [ retainstack-for ]
+ [ namestack-for ]
+ [ catchstack-for ]
+ } cleave <continuation> ;
+
+PRIVATE>
+
SYMBOL: initial-thread
TUPLE: thread
-{ name string }
-{ quot callable initial: [ ] }
-{ exit-handler callable initial: [ ] }
-{ id integer }
-continuation
-state
-runnable
-mailbox
-variables
-sleep-entry ;
-
-: self ( -- thread ) 63 getenv ; inline
+ { name string }
+ { quot callable initial: [ ] }
+ { exit-handler callable initial: [ ] }
+ { id integer }
+ { context box }
+ state
+ runnable
+ mailbox
+ { variables hashtable }
+ sleep-entry ;
+
+: self ( -- thread )
+ OBJ-CURRENT-THREAD special-object { thread } declare ; inline
+
+: thread-continuation ( thread -- continuation )
+ context>> check-box value>> continuation-for ;
! Thread-local storage
: tnamespace ( -- assoc )
- self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
+ self variables>> ; inline
: tget ( key -- value )
- self variables>> at ;
+ tnamespace at ;
: tset ( value key -- )
tnamespace set-at ;
-: tchange ( key quot -- )
- tnamespace swap change-at ; inline
-
-: threads ( -- assoc ) 64 getenv ;
+: tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
+ [ tnamespace ] dip change-at ; inline
-: thread ( id -- thread ) threads at ;
+: threads ( -- assoc )
+ OBJ-THREADS special-object { hashtable } declare ; inline
: thread-registered? ( thread -- ? )
id>> threads key? ;
-: check-unregistered ( thread -- thread )
- dup thread-registered?
- [ "Thread already stopped" throw ] when ;
-
-: check-registered ( thread -- thread )
- dup thread-registered?
- [ "Thread is not running" throw ] unless ;
-
<PRIVATE
: register-thread ( thread -- )
- check-unregistered dup id>> threads set-at ;
+ dup id>> threads set-at ;
: unregister-thread ( thread -- )
- check-registered id>> threads delete-at ;
+ id>> threads delete-at ;
-: set-self ( thread -- ) 63 setenv ; inline
+: set-self ( thread -- )
+ OBJ-CURRENT-THREAD set-special-object ; inline
PRIVATE>
+: run-queue ( -- dlist )
+ OBJ-RUN-QUEUE special-object { dlist } declare ; inline
+
+: sleep-queue ( -- heap )
+ OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
+
+: waiting-callbacks ( -- assoc )
+ OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
+
: new-thread ( quot name class -- thread )
new
swap >>name
swap >>quot
\ thread counter >>id
- <box> >>continuation ; inline
+ H{ } clone >>variables
+ <box> >>context ; inline
: <thread> ( quot name -- thread )
\ thread new-thread ;
-: run-queue ( -- dlist ) 65 getenv ;
-
-: sleep-queue ( -- heap ) 66 getenv ;
-
: resume ( thread -- )
- f >>state
- check-registered run-queue push-front ;
+ f >>state run-queue push-front ;
: resume-now ( thread -- )
- f >>state
- check-registered run-queue push-back ;
+ f >>state run-queue push-back ;
: resume-with ( obj thread -- )
- f >>state
- check-registered 2array run-queue push-front ;
+ f >>state 2array run-queue push-front ;
-: sleep-time ( -- us/f )
+: sleep-time ( -- nanos/f )
{
+ { [ current-callback waiting-callbacks key? ] [ 0 ] }
{ [ run-queue deque-empty? not ] [ 0 ] }
{ [ sleep-queue heap-empty? ] [ f ] }
- [ sleep-queue heap-peek nip micros [-] ]
+ [ sleep-queue heap-peek nip nano-count [-] ]
} cond ;
+: interrupt ( thread -- )
+ dup state>> [
+ [
+ [ sleep-queue heap-delete ] when* f
+ ] change-sleep-entry dup resume
+ ] when drop ;
+
DEFER: stop
<PRIVATE
: schedule-sleep ( thread dt -- )
- [ check-registered dup ] dip sleep-queue heap-push*
- >>sleep-entry drop ;
+ dupd sleep-queue heap-push* >>sleep-entry drop ;
-: expire-sleep? ( heap -- ? )
- dup heap-empty?
- [ drop f ] [ heap-peek nip micros <= ] if ;
+: expire-sleep? ( -- ? )
+ sleep-queue dup heap-empty?
+ [ drop f ] [ heap-peek nip nano-count <= ] if ;
: expire-sleep ( thread -- )
f >>sleep-entry resume ;
: expire-sleep-loop ( -- )
- sleep-queue
- [ dup expire-sleep? ]
- [ dup heap-pop drop expire-sleep ]
- [ ] while
- drop ;
+ [ expire-sleep? ]
+ [ sleep-queue heap-pop drop expire-sleep ]
+ while ;
-: start ( namestack thread -- * )
+CONSTANT: [start]
[
- set-self
set-namestack
- V{ } set-catchstack
- { } set-retainstack
- { } set-datastack
- self quot>> [ call stop ] call-clear
- ] 2 (throw) ;
-
-DEFER: next
-
-: no-runnable-threads ( -- * )
- ! We should never be in a state where the only threads
- ! are sleeping; the I/O wait thread is always runnable.
- ! However, if it dies, we handle this case
- ! semi-gracefully.
- !
- ! And if sleep-time outputs f, there are no sleeping
- ! threads either... so WTF.
- sleep-time [ die 0 ] unless* (sleep) next ;
-
-: (next) ( arg thread -- * )
- f >>state
- dup set-self
- dup runnable>> [
- continuation>> box> continue-with
- ] [
- t >>runnable start
- ] if ;
-
-: next ( -- * )
+ init-catchstack
+ self quot>> call
+ stop
+ ]
+
+GENERIC: (next) ( obj thread -- obj' )
+
+M: thread (next)
+ dup runnable>>
+ [ context>> box> set-context ]
+ [ t >>runnable drop [start] start-context ] if ;
+
+: (stop) ( obj thread -- * )
+ dup runnable>>
+ [ context>> box> set-context-and-delete ]
+ [ t >>runnable drop [start] start-context-and-delete ] if ;
+
+: wake-up-callbacks ( -- )
+ current-callback waiting-callbacks delete-at*
+ [ resume-now ] [ drop ] if ;
+
+: next ( -- obj thread )
expire-sleep-loop
- run-queue dup deque-empty? [
- drop no-runnable-threads
- ] [
- pop-back dup array? [ first2 ] [ f swap ] if (next)
- ] if ;
+ wake-up-callbacks
+ run-queue pop-back
+ dup array? [ first2 ] [ [ f ] dip ] if
+ f >>state
+ dup set-self ;
PRIVATE>
-: stop ( -- )
- self [ exit-handler>> call ] [ unregister-thread ] bi next ;
+: stop ( -- * )
+ self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
+ next (stop) ;
-: suspend ( quot state -- obj )
- [
- [ [ self swap call ] dip self (>>state) ] dip
- self continuation>> >box
- next
- ] callcc1 2nip ; inline
+: suspend ( state -- obj )
+ [ self ] dip >>state
+ [ context ] dip context>> >box
+ next (next) ;
-: yield ( -- ) [ resume ] f suspend drop ;
+: yield ( -- )
+ self resume f suspend drop ;
-GENERIC: sleep-until ( time/f -- )
+GENERIC: sleep-until ( n/f -- )
M: integer sleep-until
- '[ _ schedule-sleep ] "sleep" suspend drop ;
+ [ self ] dip schedule-sleep "sleep" suspend drop ;
M: f sleep-until
- drop [ drop ] "interrupt" suspend drop ;
+ drop "standby" suspend drop ;
GENERIC: sleep ( dt -- )
M: real sleep
- micros + >integer sleep-until ;
-
-: interrupt ( thread -- )
- dup state>> [
- dup sleep-entry>> [ sleep-queue heap-delete ] when*
- f >>sleep-entry
- dup resume
- ] when drop ;
+ >integer nano-count + sleep-until ;
: (spawn) ( thread -- )
- [ register-thread ] [ namestack swap resume-with ] bi ;
+ [ register-thread ] [ [ get-namestack ] dip resume-with ] bi ;
: spawn ( quot name -- thread )
<thread> [ (spawn) ] keep ;
[ '[ _ loop ] ] dip spawn ;
: in-thread ( quot -- )
- [ datastack ] dip
- '[ _ set-datastack _ call ]
+ [ get-datastack ] dip
+ '[ _ set-datastack @ ]
"Thread" spawn drop ;
-GENERIC: error-in-thread ( error thread -- )
-
<PRIVATE
-: init-threads ( -- )
- H{ } clone 64 setenv
- <dlist> 65 setenv
- <min-heap> 66 setenv
- initial-thread global
- [ drop [ ] "Initial" <thread> ] cache
- <box> >>continuation
+: init-thread-state ( -- )
+ H{ } clone OBJ-THREADS set-special-object
+ <dlist> OBJ-RUN-QUEUE set-special-object
+ <min-heap> OBJ-SLEEP-QUEUE set-special-object
+ H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
+
+: init-initial-thread ( -- )
+ [ ] "Initial" <thread>
t >>runnable
- f >>state
- dup register-thread
- set-self ;
+ [ initial-thread set-global ]
+ [ register-thread ]
+ [ set-self ]
+ tri ;
+
+: init-threads ( -- )
+ init-thread-state
+ init-initial-thread ;
+
+: wait-for-callback ( callback -- )
+ self swap waiting-callbacks set-at
+ "Callback return" suspend drop ;
PRIVATE>
-[ init-threads ] "threads" add-init-hook
+STARTUP-HOOK: init-threads