]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
threads: simplify 'suspend' combinator
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2010 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators combinators.private init boxes
7 accessors math.order deques strings quotations fry ;
8 IN: threads
9
10 <PRIVATE
11
12 ! (set-context) and (start-context) are sub-primitives, but
13 ! we don't want them inlined into callers since their behavior
14 ! depends on what frames are on the callstack
15 : set-context ( obj context -- obj' ) (set-context) ;
16 : start-context ( obj quot: ( obj -- * ) -- obj' ) (start-context) ;
17
18 PRIVATE>
19
20 SYMBOL: initial-thread
21
22 TUPLE: thread
23 { name string }
24 { quot callable initial: [ ] }
25 { exit-handler callable initial: [ ] }
26 { id integer }
27 { continuation box }
28 state
29 runnable
30 mailbox
31 { variables hashtable }
32 sleep-entry ;
33
34 : self ( -- thread )
35     63 special-object { thread } declare ; inline
36
37 ! Thread-local storage
38 : tnamespace ( -- assoc )
39     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
40
41 : tget ( key -- value )
42     self variables>> at ;
43
44 : tset ( value key -- )
45     tnamespace set-at ;
46
47 : tchange ( key quot -- )
48     tnamespace swap change-at ; inline
49
50 : threads ( -- assoc )
51     64 special-object { hashtable } declare ; inline
52
53 : thread ( id -- thread )
54     threads at ;
55
56 : thread-registered? ( thread -- ? )
57     id>> threads key? ;
58
59 ERROR: already-stopped thread ;
60
61 : check-unregistered ( thread -- thread )
62     dup thread-registered? [ already-stopped ] when ;
63
64 ERROR: not-running thread ;
65
66 : check-registered ( thread -- thread )
67     dup thread-registered? [ not-running ] unless ;
68
69 <PRIVATE
70
71 : register-thread ( thread -- )
72     check-unregistered dup id>> threads set-at ;
73
74 : unregister-thread ( thread -- )
75     check-registered id>> threads delete-at ;
76
77 : set-self ( thread -- ) 63 set-special-object ; inline
78
79 PRIVATE>
80
81 : new-thread ( quot name class -- thread )
82     new
83         swap >>name
84         swap >>quot
85         \ thread counter >>id
86         <box> >>continuation ; inline
87
88 : <thread> ( quot name -- thread )
89     \ thread new-thread ;
90
91 : run-queue ( -- dlist )
92     65 special-object { dlist } declare ; inline
93
94 : sleep-queue ( -- heap )
95     66 special-object { dlist } declare ; inline
96
97 : resume ( thread -- )
98     f >>state
99     check-registered run-queue push-front ;
100
101 : resume-now ( thread -- )
102     f >>state
103     check-registered run-queue push-back ;
104
105 : resume-with ( obj thread -- )
106     f >>state
107     check-registered 2array run-queue push-front ;
108
109 : sleep-time ( -- nanos/f )
110     {
111         { [ run-queue deque-empty? not ] [ 0 ] }
112         { [ sleep-queue heap-empty? ] [ f ] }
113         [ sleep-queue heap-peek nip nano-count [-] ]
114     } cond ;
115
116 DEFER: stop
117
118 <PRIVATE
119
120 : schedule-sleep ( thread dt -- )
121     [ check-registered dup ] dip sleep-queue heap-push*
122     >>sleep-entry drop ;
123
124 : expire-sleep? ( heap -- ? )
125     dup heap-empty?
126     [ drop f ] [ heap-peek nip nano-count <= ] if ;
127
128 : expire-sleep ( thread -- )
129     f >>sleep-entry resume ;
130
131 : expire-sleep-loop ( -- )
132     sleep-queue
133     [ dup expire-sleep? ]
134     [ dup heap-pop drop expire-sleep ]
135     while
136     drop ;
137
138 : start ( namestack thread -- * )
139     [
140         set-self
141         set-namestack
142         V{ } set-catchstack
143         { } set-retainstack
144         { } set-datastack
145         self quot>> [ call stop ] call-clear
146     ] (( namestack thread -- * )) call-effect-unsafe ;
147
148 DEFER: next
149
150 : no-runnable-threads ( -- * )
151     ! We should never be in a state where the only threads
152     ! are sleeping; the I/O wait thread is always runnable.
153     ! However, if it dies, we handle this case
154     ! semi-gracefully.
155     !
156     ! And if sleep-time outputs f, there are no sleeping
157     ! threads either... so WTF.
158     sleep-time {
159         { [ dup not ] [ drop die ] }
160         { [ dup 0 = ] [ drop ] }
161         [ (sleep) ]
162     } cond next ;
163
164 : (next) ( arg thread -- * )
165     f >>state
166     dup set-self
167     dup runnable>> [
168         continuation>> box> continue-with
169     ] [
170         t >>runnable start
171     ] if ;
172
173 : next ( -- * )
174     expire-sleep-loop
175     run-queue dup deque-empty? [
176         drop no-runnable-threads
177     ] [
178         pop-back dup array? [ first2 ] [ f swap ] if (next)
179     ] if ;
180
181 PRIVATE>
182
183 : stop ( -- * )
184     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi next ;
185
186 : suspend ( state -- obj )
187     self (>>state)
188     [ self continuation>> >box next ] callcc1 ; inline
189
190 : yield ( -- ) self resume f suspend drop ;
191
192 GENERIC: sleep-until ( n/f -- )
193
194 M: integer sleep-until
195     [ self ] dip schedule-sleep "sleep" suspend drop ;
196
197 M: f sleep-until
198     drop "interrupt" suspend drop ;
199
200 GENERIC: sleep ( dt -- )
201
202 M: real sleep
203     >integer nano-count + sleep-until ;
204
205 : interrupt ( thread -- )
206     dup state>> [
207         dup sleep-entry>> [ sleep-queue heap-delete ] when*
208         f >>sleep-entry
209         dup resume
210     ] when drop ;
211
212 : (spawn) ( thread -- )
213     [ register-thread ] [ namestack swap resume-with ] bi ;
214
215 : spawn ( quot name -- thread )
216     <thread> [ (spawn) ] keep ;
217
218 : spawn-server ( quot name -- thread )
219     [ '[ _ loop ] ] dip spawn ;
220
221 : in-thread ( quot -- )
222     [ datastack ] dip
223     '[ _ set-datastack @ ]
224     "Thread" spawn drop ;
225
226 GENERIC: error-in-thread ( error thread -- )
227
228 <PRIVATE
229
230 : init-threads ( -- )
231     H{ } clone 64 set-special-object
232     <dlist> 65 set-special-object
233     <min-heap> 66 set-special-object
234     initial-thread global
235     [ drop [ ] "Initial" <thread> ] cache
236     <box> >>continuation
237     t >>runnable
238     f >>state
239     dup register-thread
240     set-self ;
241
242 PRIVATE>
243
244 [ init-threads ] "threads" add-startup-hook