]> gitweb.factorcode.org Git - factor.git/blobdiff - basis/threads/threads.factor
basis: use lint.vocabs tool to trim using lists
[factor.git] / basis / threads / threads.factor
index 8556167009db22850f4256da628c0a32ad40cc9f..d56a7928ad3bb98037a875a1980999e74489b4e9 100644 (file)
-! Copyright (C) 2004, 2008 Slava Pestov.
+! Copyright (C) 2004, 2011 Slava Pestov.
 ! Copyright (C) 2005 Mackenzie Straight.
 ! See http://factorcode.org/license.txt for BSD license.
-USING: arrays hashtables heaps kernel kernel.private math
-namespaces sequences vectors continuations continuations.private
-dlists assocs system combinators init boxes accessors
-math.order deques strings quotations fry ;
+USING: accessors alien.private arrays assocs boxes combinators
+continuations continuations.private deques dlists hashtables
+heaps kernel kernel.private math math.order namespaces
+quotations sequences strings system ;
 IN: threads
 
+<PRIVATE
+PRIMITIVE: (set-context) ( obj context -- obj' )
+PRIMITIVE: (set-context-and-delete) ( obj context -- * )
+PRIMITIVE: (sleep) ( nanos -- )
+PRIMITIVE: (start-context) ( obj quot -- obj' )
+PRIMITIVE: (start-context-and-delete) ( obj quot -- * )
+
+PRIMITIVE: context-object-for ( n context -- obj )
+
+! Wrap sub-primitives; we don't want them inlined into callers
+! since their behavior depends on what frames are on the callstack
+: set-context ( obj context -- obj' )
+    (set-context) ; inline
+
+: start-context ( obj quot: ( obj -- * ) -- obj' )
+    (start-context) ; inline
+
+: set-context-and-delete ( obj context -- * )
+    (set-context-and-delete) ; inline
+
+: start-context-and-delete ( obj quot: ( obj -- * ) -- * )
+    (start-context-and-delete) ; inline
+
+! Context introspection
+: namestack-for ( context -- namestack )
+    [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
+
+: catchstack-for ( context -- catchstack )
+    [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
+
+: continuation-for ( context -- continuation )
+    {
+        [ datastack-for ]
+        [ callstack-for ]
+        [ retainstack-for ]
+        [ namestack-for ]
+        [ catchstack-for ]
+    } cleave <continuation> ;
+
+PRIVATE>
+
 SYMBOL: initial-thread
 
 TUPLE: thread
-{ name string }
-{ quot callable initial: [ ] }
-{ exit-handler callable initial: [ ] }
-{ id integer }
-continuation
-state
-runnable
-mailbox
-variables
-sleep-entry ;
-
-: self ( -- thread ) 63 getenv ; inline
+    { name string }
+    { quot callable initial: [ ] }
+    { exit-handler callable initial: [ ] }
+    { id integer }
+    { context box }
+    state
+    runnable
+    mailbox
+    { variables hashtable }
+    sleep-entry ;
+
+: self ( -- thread )
+    OBJ-CURRENT-THREAD special-object { thread } declare ; inline
+
+: thread-continuation ( thread -- continuation )
+    context>> check-box value>> continuation-for ;
 
 ! Thread-local storage
 : tnamespace ( -- assoc )
-    self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
+    self variables>> ; inline
 
 : tget ( key -- value )
-    self variables>> at ;
+    tnamespace at ;
 
 : tset ( value key -- )
     tnamespace set-at ;
 
-: tchange ( key quot -- )
-    tnamespace swap change-at ; inline
-
-: threads ( -- assoc ) 64 getenv ;
+: tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
+    [ tnamespace ] dip change-at ; inline
 
-: thread ( id -- thread ) threads at ;
+: threads ( -- assoc )
+    OBJ-THREADS special-object { hashtable } declare ; inline
 
 : thread-registered? ( thread -- ? )
     id>> threads key? ;
 
-: check-unregistered ( thread -- thread )
-    dup thread-registered?
-    [ "Thread already stopped" throw ] when ;
-
-: check-registered ( thread -- thread )
-    dup thread-registered?
-    [ "Thread is not running" throw ] unless ;
-
 <PRIVATE
 
 : register-thread ( thread -- )
-    check-unregistered dup id>> threads set-at ;
+    dup id>> threads set-at ;
 
 : unregister-thread ( thread -- )
-    check-registered id>> threads delete-at ;
+    id>> threads delete-at ;
 
-: set-self ( thread -- ) 63 setenv ; inline
+: set-self ( thread -- )
+    OBJ-CURRENT-THREAD set-special-object ; inline
 
 PRIVATE>
 
+: run-queue ( -- dlist )
+    OBJ-RUN-QUEUE special-object { dlist } declare ; inline
+
+: sleep-queue ( -- heap )
+    OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
+
+: waiting-callbacks ( -- assoc )
+    OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
+
 : new-thread ( quot name class -- thread )
     new
         swap >>name
         swap >>quot
         \ thread counter >>id
-        <box> >>continuation ; inline
+        H{ } clone >>variables
+        <box> >>context ; inline
 
 : <thread> ( quot name -- thread )
     \ thread new-thread ;
 
-: run-queue ( -- dlist ) 65 getenv ;
-
-: sleep-queue ( -- heap ) 66 getenv ;
-
 : resume ( thread -- )
-    f >>state
-    check-registered run-queue push-front ;
+    f >>state run-queue push-front ;
 
 : resume-now ( thread -- )
-    f >>state
-    check-registered run-queue push-back ;
+    f >>state run-queue push-back ;
 
 : resume-with ( obj thread -- )
-    f >>state
-    check-registered 2array run-queue push-front ;
+    f >>state 2array run-queue push-front ;
 
-: sleep-time ( -- us/f )
+: sleep-time ( -- nanos/f )
     {
+        { [ current-callback waiting-callbacks key? ] [ 0 ] }
         { [ run-queue deque-empty? not ] [ 0 ] }
         { [ sleep-queue heap-empty? ] [ f ] }
-        [ sleep-queue heap-peek nip micros [-] ]
+        [ sleep-queue heap-peek nip nano-count [-] ]
     } cond ;
 
+: interrupt ( thread -- )
+    dup state>> [
+        [
+            [ sleep-queue heap-delete ] when* f
+        ] change-sleep-entry dup resume
+    ] when drop ;
+
 DEFER: stop
 
 <PRIVATE
 
 : schedule-sleep ( thread dt -- )
-    [ check-registered dup ] dip sleep-queue heap-push*
-    >>sleep-entry drop ;
+    dupd sleep-queue heap-push* >>sleep-entry drop ;
 
-: expire-sleep? ( heap -- ? )
-    dup heap-empty?
-    [ drop f ] [ heap-peek nip micros <= ] if ;
+: expire-sleep? ( -- ? )
+    sleep-queue dup heap-empty?
+    [ drop f ] [ heap-peek nip nano-count <= ] if ;
 
 : expire-sleep ( thread -- )
     f >>sleep-entry resume ;
 
 : expire-sleep-loop ( -- )
-    sleep-queue
-    [ dup expire-sleep? ]
-    [ dup heap-pop drop expire-sleep ]
-    [ ] while
-    drop ;
+    [ expire-sleep? ]
+    [ sleep-queue heap-pop drop expire-sleep ]
+    while ;
 
-: start ( namestack thread -- * )
+CONSTANT: [start]
     [
-        set-self
         set-namestack
-        V{ } set-catchstack
-        { } set-retainstack
-        { } set-datastack
-        self quot>> [ call stop ] call-clear
-    ] 2 (throw) ;
-
-DEFER: next
-
-: no-runnable-threads ( -- * )
-    ! We should never be in a state where the only threads
-    ! are sleeping; the I/O wait thread is always runnable.
-    ! However, if it dies, we handle this case
-    ! semi-gracefully.
-    !
-    ! And if sleep-time outputs f, there are no sleeping
-    ! threads either... so WTF.
-    sleep-time [ die 0 ] unless* (sleep) next ;
-
-: (next) ( arg thread -- * )
-    f >>state
-    dup set-self
-    dup runnable>> [
-        continuation>> box> continue-with
-    ] [
-        t >>runnable start
-    ] if ;
-
-: next ( -- * )
+        init-catchstack
+        self quot>> call
+        stop
+    ]
+
+GENERIC: (next) ( obj thread -- obj' )
+
+M: thread (next)
+    dup runnable>>
+    [ context>> box> set-context ]
+    [ t >>runnable drop [start] start-context ] if ;
+
+: (stop) ( obj thread -- * )
+    dup runnable>>
+    [ context>> box> set-context-and-delete ]
+    [ t >>runnable drop [start] start-context-and-delete ] if ;
+
+: wake-up-callbacks ( -- )
+    current-callback waiting-callbacks delete-at*
+    [ resume-now ] [ drop ] if ;
+
+: next ( -- obj thread )
     expire-sleep-loop
-    run-queue dup deque-empty? [
-        drop no-runnable-threads
-    ] [
-        pop-back dup array? [ first2 ] [ f swap ] if (next)
-    ] if ;
+    wake-up-callbacks
+    run-queue pop-back
+    dup array? [ first2 ] [ [ f ] dip ] if
+    f >>state
+    dup set-self ;
 
 PRIVATE>
 
-: stop ( -- )
-    self [ exit-handler>> call ] [ unregister-thread ] bi next ;
+: stop ( -- * )
+    self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
+    next (stop) ;
 
-: suspend ( quot state -- obj )
-    [
-        [ [ self swap call ] dip self (>>state) ] dip
-        self continuation>> >box
-        next
-    ] callcc1 2nip ; inline
+: suspend ( state -- obj )
+    [ self ] dip >>state
+    [ context ] dip context>> >box
+    next (next) ;
 
-: yield ( -- ) [ resume ] f suspend drop ;
+: yield ( -- )
+    self resume f suspend drop ;
 
-GENERIC: sleep-until ( time/f -- )
+GENERIC: sleep-until ( n/f -- )
 
 M: integer sleep-until
-    '[ _ schedule-sleep ] "sleep" suspend drop ;
+    [ self ] dip schedule-sleep "sleep" suspend drop ;
 
 M: f sleep-until
-    drop [ drop ] "interrupt" suspend drop ;
+    drop "standby" suspend drop ;
 
 GENERIC: sleep ( dt -- )
 
 M: real sleep
-    micros + >integer sleep-until ;
-
-: interrupt ( thread -- )
-    dup state>> [
-        dup sleep-entry>> [ sleep-queue heap-delete ] when*
-        f >>sleep-entry
-        dup resume
-    ] when drop ;
+    >integer nano-count + sleep-until ;
 
 : (spawn) ( thread -- )
-    [ register-thread ] [ namestack swap resume-with ] bi ;
+    [ register-thread ] [ [ get-namestack ] dip resume-with ] bi ;
 
 : spawn ( quot name -- thread )
     <thread> [ (spawn) ] keep ;
@@ -201,26 +232,34 @@ M: real sleep
     [ '[ _ loop ] ] dip spawn ;
 
 : in-thread ( quot -- )
-    [ datastack ] dip
-    '[ _ set-datastack _ call ]
+    [ get-datastack ] dip
+    '[ _ set-datastack @ ]
     "Thread" spawn drop ;
 
-GENERIC: error-in-thread ( error thread -- )
-
 <PRIVATE
 
-: init-threads ( -- )
-    H{ } clone 64 setenv
-    <dlist> 65 setenv
-    <min-heap> 66 setenv
-    initial-thread global
-    [ drop [ ] "Initial" <thread> ] cache
-    <box> >>continuation
+: init-thread-state ( -- )
+    H{ } clone OBJ-THREADS set-special-object
+    <dlist> OBJ-RUN-QUEUE set-special-object
+    <min-heap> OBJ-SLEEP-QUEUE set-special-object
+    H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
+
+: init-initial-thread ( -- )
+    [ ] "Initial" <thread>
     t >>runnable
-    f >>state
-    dup register-thread
-    set-self ;
+    [ initial-thread set-global ]
+    [ register-thread ]
+    [ set-self ]
+    tri ;
+
+: init-threads ( -- )
+    init-thread-state
+    init-initial-thread ;
+
+: wait-for-callback ( callback -- )
+    self swap waiting-callbacks set-at
+    "Callback return" suspend drop ;
 
 PRIVATE>
 
-[ init-threads ] "threads" add-init-hook
+STARTUP-HOOK: init-threads