]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
threads: delete old contexts immediately instead of handing them off to a 'context...
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2010 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors math.order
7 deques strings quotations fry ;
8 IN: threads
9
10 <PRIVATE
11
12 ! Wrap sub-primitives; we don't want them inlined into callers
13 ! since their behavior depends on what frames are on the callstack
14 : set-context ( obj context -- obj' )
15     (set-context) ;
16
17 : start-context ( obj quot: ( obj -- * ) -- obj' )
18     (start-context) ;
19
20 : set-context-and-delete ( obj context -- * )
21     (set-context-and-delete) ;
22
23 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
24     (start-context-and-delete) ;
25
26 ! Context introspection
27 : namestack-for ( context -- namestack )
28     [ 0 ] dip context-object-for ;
29
30 : catchstack-for ( context -- catchstack )
31     [ 1 ] dip context-object-for ;
32
33 : continuation-for ( context -- continuation )
34     {
35         [ datastack-for ]
36         [ callstack-for ]
37         [ retainstack-for ]
38         [ namestack-for ]
39         [ catchstack-for ]
40     } cleave <continuation> ;
41
42 PRIVATE>
43
44 SYMBOL: initial-thread
45
46 TUPLE: thread
47 { name string }
48 { quot callable initial: [ ] }
49 { exit-handler callable initial: [ ] }
50 { id integer }
51 { context box }
52 state
53 runnable
54 mailbox
55 { variables hashtable }
56 sleep-entry ;
57
58 : self ( -- thread )
59     63 special-object { thread } declare ; inline
60
61 : thread-continuation ( thread -- continuation )
62     context>> check-box value>> continuation-for ;
63
64 ! Thread-local storage
65 : tnamespace ( -- assoc )
66     self variables>> ; inline
67
68 : tget ( key -- value )
69     tnamespace at ;
70
71 : tset ( value key -- )
72     tnamespace set-at ;
73
74 : tchange ( key quot -- )
75     [ tnamespace ] dip change-at ; inline
76
77 : threads ( -- assoc )
78     64 special-object { hashtable } declare ; inline
79
80 : thread-registered? ( thread -- ? )
81     id>> threads key? ;
82
83 ERROR: already-stopped thread ;
84
85 : check-unregistered ( thread -- thread )
86     dup thread-registered? [ already-stopped ] when ;
87
88 ERROR: not-running thread ;
89
90 : check-registered ( thread -- thread )
91     dup thread-registered? [ not-running ] unless ;
92
93 <PRIVATE
94
95 : register-thread ( thread -- )
96     check-unregistered dup id>> threads set-at ;
97
98 : unregister-thread ( thread -- )
99     check-registered id>> threads delete-at ;
100
101 : set-self ( thread -- ) 63 set-special-object ; inline
102
103 PRIVATE>
104
105 : run-queue ( -- dlist )
106     65 special-object { dlist } declare ; inline
107
108 : sleep-queue ( -- heap )
109     66 special-object { dlist } declare ; inline
110
111 : new-thread ( quot name class -- thread )
112     new
113         swap >>name
114         swap >>quot
115         \ thread counter >>id
116         H{ } clone >>variables
117         <box> >>context ; inline
118
119 : <thread> ( quot name -- thread )
120     \ thread new-thread ;
121
122 : resume ( thread -- )
123     f >>state
124     check-registered run-queue push-front ;
125
126 : resume-now ( thread -- )
127     f >>state
128     check-registered run-queue push-back ;
129
130 : resume-with ( obj thread -- )
131     f >>state
132     check-registered 2array run-queue push-front ;
133
134 : sleep-time ( -- nanos/f )
135     {
136         { [ run-queue deque-empty? not ] [ 0 ] }
137         { [ sleep-queue heap-empty? ] [ f ] }
138         [ sleep-queue heap-peek nip nano-count [-] ]
139     } cond ;
140
141 : interrupt ( thread -- )
142     dup state>> [
143         dup sleep-entry>> [ sleep-queue heap-delete ] when*
144         f >>sleep-entry
145         dup resume
146     ] when drop ;
147
148 DEFER: stop
149
150 <PRIVATE
151
152 : schedule-sleep ( thread dt -- )
153     [ check-registered dup ] dip sleep-queue heap-push*
154     >>sleep-entry drop ;
155
156 : expire-sleep? ( heap -- ? )
157     dup heap-empty?
158     [ drop f ] [ heap-peek nip nano-count <= ] if ;
159
160 : expire-sleep ( thread -- )
161     f >>sleep-entry resume ;
162
163 : expire-sleep-loop ( -- )
164     sleep-queue
165     [ dup expire-sleep? ]
166     [ dup heap-pop drop expire-sleep ]
167     while
168     drop ;
169
170 CONSTANT: [start]
171     [
172         set-namestack
173         init-catchstack
174         self quot>> call
175         stop
176     ]
177
178 : no-runnable-threads ( -- ) die ;
179
180 : (next) ( obj thread -- obj' )
181     dup runnable>>
182     [ context>> box> set-context ]
183     [ t >>runnable drop [start] start-context ] if ;
184
185 : (stop) ( obj thread -- * )
186     dup runnable>>
187     [ context>> box> set-context-and-delete ]
188     [ t >>runnable drop [start] start-context-and-delete ] if ;
189
190 : next ( -- obj thread )
191     expire-sleep-loop
192     run-queue pop-back
193     dup array? [ first2 ] [ [ f ] dip ] if
194     f >>state
195     dup set-self ;
196
197 PRIVATE>
198
199 : stop ( -- * )
200     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
201     next (stop) ;
202
203 : suspend ( state -- obj )
204     [ self ] dip >>state
205     [ context ] dip context>> >box
206     next (next) ;
207
208 : yield ( -- ) self resume f suspend drop ;
209
210 GENERIC: sleep-until ( n/f -- )
211
212 M: integer sleep-until
213     [ self ] dip schedule-sleep "sleep" suspend drop ;
214
215 M: f sleep-until
216     drop "standby" suspend drop ;
217
218 GENERIC: sleep ( dt -- )
219
220 M: real sleep
221     >integer nano-count + sleep-until ;
222
223 : (spawn) ( thread -- )
224     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
225
226 : spawn ( quot name -- thread )
227     <thread> [ (spawn) ] keep ;
228
229 : spawn-server ( quot name -- thread )
230     [ '[ _ loop ] ] dip spawn ;
231
232 : in-thread ( quot -- )
233     [ datastack ] dip
234     '[ _ set-datastack @ ]
235     "Thread" spawn drop ;
236
237 GENERIC: error-in-thread ( error thread -- )
238
239 <PRIVATE
240
241 : init-thread-state ( -- )
242     H{ } clone 64 set-special-object
243     <dlist> 65 set-special-object
244     <min-heap> 66 set-special-object ;
245
246 : init-initial-thread ( -- )
247     [ ] "Initial" <thread>
248     t >>runnable
249     [ initial-thread set-global ]
250     [ register-thread ]
251     [ set-self ]
252     tri ;
253
254 : init-threads ( -- )
255     init-thread-state
256     init-initial-thread ;
257
258 PRIVATE>
259
260 [ init-threads ] "threads" add-startup-hook