]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
Change do-callback to register the current thread with the callback, instead of busy...
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: alien.private arrays hashtables heaps kernel kernel.private
5 math namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors math.order
7 deques strings quotations fry ;
8 IN: threads
9
10 <PRIVATE
11
12 ! Wrap sub-primitives; we don't want them inlined into callers
13 ! since their behavior depends on what frames are on the callstack
14 : context ( -- context )
15     2 context-object ; inline
16
17 : set-context ( obj context -- obj' )
18     (set-context) ; inline
19
20 : start-context ( obj quot: ( obj -- * ) -- obj' )
21     (start-context) ; inline
22
23 : set-context-and-delete ( obj context -- * )
24     (set-context-and-delete) ; inline
25
26 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
27     (start-context-and-delete) ; inline
28
29 ! Context introspection
30 : namestack-for ( context -- namestack )
31     [ 0 ] dip context-object-for ;
32
33 : catchstack-for ( context -- catchstack )
34     [ 1 ] dip context-object-for ;
35
36 : continuation-for ( context -- continuation )
37     {
38         [ datastack-for ]
39         [ callstack-for ]
40         [ retainstack-for ]
41         [ namestack-for ]
42         [ catchstack-for ]
43     } cleave <continuation> ;
44
45 PRIVATE>
46
47 SYMBOL: initial-thread
48
49 TUPLE: thread
50 { name string }
51 { quot callable initial: [ ] }
52 { exit-handler callable initial: [ ] }
53 { id integer }
54 { context box }
55 state
56 runnable
57 mailbox
58 { variables hashtable }
59 sleep-entry ;
60
61 : self ( -- thread )
62     63 special-object { thread } declare ; inline
63
64 : thread-continuation ( thread -- continuation )
65     context>> check-box value>> continuation-for ;
66
67 ! Thread-local storage
68 : tnamespace ( -- assoc )
69     self variables>> ; inline
70
71 : tget ( key -- value )
72     tnamespace at ;
73
74 : tset ( value key -- )
75     tnamespace set-at ;
76
77 : tchange ( key quot -- )
78     [ tnamespace ] dip change-at ; inline
79
80 : threads ( -- assoc )
81     64 special-object { hashtable } declare ; inline
82
83 : thread-registered? ( thread -- ? )
84     id>> threads key? ;
85
86 <PRIVATE
87
88 : register-thread ( thread -- )
89     dup id>> threads set-at ;
90
91 : unregister-thread ( thread -- )
92     id>> threads delete-at ;
93
94 : set-self ( thread -- ) 63 set-special-object ; inline
95
96 PRIVATE>
97
98 : run-queue ( -- dlist )
99     65 special-object { dlist } declare ; inline
100
101 : sleep-queue ( -- heap )
102     66 special-object { min-heap } declare ; inline
103
104 : waiting-callbacks ( -- assoc )
105     68 special-object { hashtable } declare ; inline
106
107 : new-thread ( quot name class -- thread )
108     new
109         swap >>name
110         swap >>quot
111         \ thread counter >>id
112         H{ } clone >>variables
113         <box> >>context ; inline
114
115 : <thread> ( quot name -- thread )
116     \ thread new-thread ;
117
118 : resume ( thread -- )
119     f >>state run-queue push-front ;
120
121 : resume-now ( thread -- )
122     f >>state run-queue push-back ;
123
124 : resume-with ( obj thread -- )
125     f >>state 2array run-queue push-front ;
126
127 : sleep-time ( -- nanos/f )
128     {
129         { [ current-callback waiting-callbacks key? ] [ 0 ] }
130         { [ run-queue deque-empty? not ] [ 0 ] }
131         { [ sleep-queue heap-empty? ] [ f ] }
132         [ sleep-queue heap-peek nip nano-count [-] ]
133     } cond ;
134
135 : interrupt ( thread -- )
136     dup state>> [
137         dup sleep-entry>> [ sleep-queue heap-delete ] when*
138         f >>sleep-entry
139         dup resume
140     ] when drop ;
141
142 DEFER: stop
143
144 <PRIVATE
145
146 : schedule-sleep ( thread dt -- )
147     dupd sleep-queue heap-push* >>sleep-entry drop ;
148
149 : expire-sleep? ( -- ? )
150     sleep-queue dup heap-empty?
151     [ drop f ] [ heap-peek nip nano-count <= ] if ;
152
153 : expire-sleep ( thread -- )
154     f >>sleep-entry resume ;
155
156 : expire-sleep-loop ( -- )
157     [ expire-sleep? ]
158     [ sleep-queue heap-pop drop expire-sleep ]
159     while ;
160
161 CONSTANT: [start]
162     [
163         set-namestack
164         init-catchstack
165         self quot>> call
166         stop
167     ]
168
169 : no-runnable-threads ( -- ) die ;
170
171 GENERIC: (next) ( obj thread -- obj' )
172
173 M: thread (next)
174     dup runnable>>
175     [ context>> box> set-context ]
176     [ t >>runnable drop [start] start-context ] if ;
177
178 : (stop) ( obj thread -- * )
179     dup runnable>>
180     [ context>> box> set-context-and-delete ]
181     [ t >>runnable drop [start] start-context-and-delete ] if ;
182
183 : wake-up-callbacks ( -- )
184     current-callback waiting-callbacks delete-at*
185     [ resume-now ] [ drop ] if ;
186
187 : next ( -- obj thread )
188     expire-sleep-loop
189     wake-up-callbacks
190     run-queue pop-back
191     dup array? [ first2 ] [ [ f ] dip ] if
192     f >>state
193     dup set-self ;
194
195 PRIVATE>
196
197 : stop ( -- * )
198     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
199     next (stop) ;
200
201 : suspend ( state -- obj )
202     [ self ] dip >>state
203     [ context ] dip context>> >box
204     next (next) ;
205
206 : yield ( -- ) self resume f suspend drop ;
207
208 GENERIC: sleep-until ( n/f -- )
209
210 M: integer sleep-until
211     [ self ] dip schedule-sleep "sleep" suspend drop ;
212
213 M: f sleep-until
214     drop "standby" suspend drop ;
215
216 GENERIC: sleep ( dt -- )
217
218 M: real sleep
219     >integer nano-count + sleep-until ;
220
221 : (spawn) ( thread -- )
222     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
223
224 : spawn ( quot name -- thread )
225     <thread> [ (spawn) ] keep ;
226
227 : spawn-server ( quot name -- thread )
228     [ '[ _ loop ] ] dip spawn ;
229
230 : in-thread ( quot -- )
231     [ datastack ] dip
232     '[ _ set-datastack @ ]
233     "Thread" spawn drop ;
234
235 GENERIC: error-in-thread ( error thread -- )
236
237 <PRIVATE
238
239 : init-thread-state ( -- )
240     H{ } clone 64 set-special-object
241     <dlist> 65 set-special-object
242     <min-heap> 66 set-special-object
243     H{ } clone 68 set-special-object ;
244
245 : init-initial-thread ( -- )
246     [ ] "Initial" <thread>
247     t >>runnable
248     [ initial-thread set-global ]
249     [ register-thread ]
250     [ set-self ]
251     tri ;
252
253 : init-threads ( -- )
254     init-thread-state
255     init-initial-thread ;
256
257 : wait-for-callback ( callback -- )
258     self swap waiting-callbacks set-at
259     "Callback return" suspend drop ;
260
261 PRIVATE>
262
263 [ init-threads ] "threads" add-startup-hook