]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
threads: use context-switching primitives
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2010 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors math.order
7 deques strings quotations fry ;
8 IN: threads
9
10 <PRIVATE
11
12 ! (set-context) and (start-context) are sub-primitives, but
13 ! we don't want them inlined into callers since their behavior
14 ! depends on what frames are on the callstack
15 : set-context ( obj context -- obj' ) (set-context) ;
16
17 : start-context ( obj quot: ( obj -- * ) -- obj' ) (start-context) ;
18
19 : namestack-for ( context -- namestack )
20     [ 0 ] dip context-object-for ;
21
22 : catchstack-for ( context -- catchstack )
23     [ 1 ] dip context-object-for ;
24
25 : continuation-for ( context -- continuation )
26     {
27         [ datastack-for ]
28         [ callstack-for ]
29         [ retainstack-for ]
30         [ namestack-for ]
31         [ catchstack-for ]
32     } cleave <continuation> ;
33
34 PRIVATE>
35
36 SYMBOL: initial-thread
37
38 TUPLE: thread
39 { name string }
40 { quot callable initial: [ ] }
41 { exit-handler callable initial: [ ] }
42 { id integer }
43 { context box }
44 state
45 runnable
46 mailbox
47 { variables hashtable }
48 sleep-entry ;
49
50 : self ( -- thread )
51     63 special-object { thread } declare ; inline
52
53 : thread-continuation ( thread -- continuation )
54     context>> check-box value>> continuation-for ;
55
56 ! Thread-local storage
57 : tnamespace ( -- assoc )
58     self variables>> ; inline
59
60 : tget ( key -- value )
61     tnamespace at ;
62
63 : tset ( value key -- )
64     tnamespace set-at ;
65
66 : tchange ( key quot -- )
67     [ tnamespace ] dip change-at ; inline
68
69 : threads ( -- assoc )
70     64 special-object { hashtable } declare ; inline
71
72 : thread-registered? ( thread -- ? )
73     id>> threads key? ;
74
75 ERROR: already-stopped thread ;
76
77 : check-unregistered ( thread -- thread )
78     dup thread-registered? [ already-stopped ] when ;
79
80 ERROR: not-running thread ;
81
82 : check-registered ( thread -- thread )
83     dup thread-registered? [ not-running ] unless ;
84
85 <PRIVATE
86
87 : register-thread ( thread -- )
88     check-unregistered dup id>> threads set-at ;
89
90 : unregister-thread ( thread -- )
91     check-registered id>> threads delete-at ;
92
93 : set-self ( thread -- ) 63 set-special-object ; inline
94
95 PRIVATE>
96
97 : run-queue ( -- dlist )
98     65 special-object { dlist } declare ; inline
99
100 : sleep-queue ( -- heap )
101     66 special-object { dlist } declare ; inline
102
103 : new-thread ( quot name class -- thread )
104     new
105         swap >>name
106         swap >>quot
107         \ thread counter >>id
108         H{ } clone >>variables
109         <box> >>context ; inline
110
111 : <thread> ( quot name -- thread )
112     \ thread new-thread ;
113
114 : resume ( thread -- )
115     f >>state
116     check-registered run-queue push-front ;
117
118 : resume-now ( thread -- )
119     f >>state
120     check-registered run-queue push-back ;
121
122 : resume-with ( obj thread -- )
123     f >>state
124     check-registered 2array run-queue push-front ;
125
126 : sleep-time ( -- nanos/f )
127     {
128         { [ run-queue deque-empty? not ] [ 0 ] }
129         { [ sleep-queue heap-empty? ] [ f ] }
130         [ sleep-queue heap-peek nip nano-count [-] ]
131     } cond ;
132
133 : interrupt ( thread -- )
134     dup state>> [
135         dup sleep-entry>> [ sleep-queue heap-delete ] when*
136         f >>sleep-entry
137         dup resume
138     ] when drop ;
139
140 DEFER: stop
141
142 <PRIVATE
143
144 : schedule-sleep ( thread dt -- )
145     [ check-registered dup ] dip sleep-queue heap-push*
146     >>sleep-entry drop ;
147
148 : expire-sleep? ( heap -- ? )
149     dup heap-empty?
150     [ drop f ] [ heap-peek nip nano-count <= ] if ;
151
152 : expire-sleep ( thread -- )
153     f >>sleep-entry resume ;
154
155 : expire-sleep-loop ( -- )
156     sleep-queue
157     [ dup expire-sleep? ]
158     [ dup heap-pop drop expire-sleep ]
159     while
160     drop ;
161
162 : start ( namestack -- obj )
163     [
164         set-namestack
165         init-catchstack
166         self quot>> call
167         stop
168     ] start-context ;
169
170 DEFER: next
171
172 : no-runnable-threads ( -- obj )
173     ! We should never be in a state where the only threads
174     ! are sleeping; the I/O wait thread is always runnable.
175     ! However, if it dies, we handle this case
176     ! semi-gracefully.
177     !
178     ! And if sleep-time outputs f, there are no sleeping
179     ! threads either... so WTF.
180     sleep-time {
181         { [ dup not ] [ drop die ] }
182         { [ dup 0 = ] [ drop ] }
183         [ (sleep) ]
184     } cond next ;
185
186 : (next) ( obj thread -- obj' )
187     f >>state
188     dup set-self
189     dup runnable>>
190     [ context>> box> set-context ] [ t >>runnable drop start ] if ;
191
192 : next ( -- obj )
193     expire-sleep-loop
194     run-queue dup deque-empty?
195     [ drop no-runnable-threads ]
196     [ pop-back dup array? [ first2 ] [ [ f ] dip ] if (next) ] if ;
197
198 : recycler-thread ( -- thread ) 68 special-object ;
199
200 : recycler-queue ( -- vector ) 69 special-object ;
201
202 : delete-context-later ( context -- )
203     recycler-queue push recycler-thread interrupt ;
204
205 PRIVATE>
206
207 : stop ( -- * )
208     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
209     context delete-context-later next
210     die 1 exit ;
211
212 : suspend ( state -- obj )
213     [ self ] dip >>state
214     [ context ] dip context>> >box
215     next ;
216
217 : yield ( -- ) self resume f suspend drop ;
218
219 GENERIC: sleep-until ( n/f -- )
220
221 M: integer sleep-until
222     [ self ] dip schedule-sleep "sleep" suspend drop ;
223
224 M: f sleep-until
225     drop "standby" suspend drop ;
226
227 GENERIC: sleep ( dt -- )
228
229 M: real sleep
230     >integer nano-count + sleep-until ;
231
232 : (spawn) ( thread -- )
233     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
234
235 : spawn ( quot name -- thread )
236     <thread> [ (spawn) ] keep ;
237
238 : spawn-server ( quot name -- thread )
239     [ '[ _ loop ] ] dip spawn ;
240
241 : in-thread ( quot -- )
242     [ datastack ] dip
243     '[ _ set-datastack @ ]
244     "Thread" spawn drop ;
245
246 GENERIC: error-in-thread ( error thread -- )
247
248 <PRIVATE
249
250 : init-thread-state ( -- )
251     H{ } clone 64 set-special-object
252     <dlist> 65 set-special-object
253     <min-heap> 66 set-special-object ;
254
255 : init-initial-thread ( -- )
256     [ ] "Initial" <thread>
257     t >>runnable
258     [ initial-thread set-global ]
259     [ register-thread ]
260     [ set-self ]
261     tri ;
262
263 ! The recycler thread deletes contexts belonging to stopped
264 ! threads
265
266 : recycler-loop ( -- )
267     recycler-queue [ [ delete-context ] each ] [ delete-all ] bi
268     f sleep-until
269     recycler-loop ;
270
271 : init-recycler ( -- )
272     [ recycler-loop ] "Context recycler" spawn 68 set-special-object
273     V{ } clone 69 set-special-object ;
274
275 : init-threads ( -- )
276     init-thread-state
277     init-initial-thread
278     init-recycler ;
279
280 PRIVATE>
281
282 [ init-threads ] "threads" add-startup-hook