]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
core: callstack is a builtin type and a class name. rename the *stack words that...
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors alien.private arrays assocs boxes combinators
5 continuations continuations.private deques dlists fry hashtables
6 heaps init kernel kernel.private math math.order namespaces
7 quotations sequences strings system ;
8 IN: threads
9
10 <PRIVATE
11 PRIMITIVE: (set-context) ( obj context -- obj' )
12 PRIMITIVE: (set-context-and-delete) ( obj context -- * )
13 PRIMITIVE: (sleep) ( nanos -- )
14 PRIMITIVE: (start-context) ( obj quot -- obj' )
15 PRIMITIVE: (start-context-and-delete) ( obj quot -- * )
16 PRIMITIVE: callstack-for ( context -- array )
17 PRIMITIVE: context-object-for ( n context -- obj )
18 PRIMITIVE: datastack-for ( context -- array )
19 PRIMITIVE: retainstack-for ( context -- array )
20
21 ! Wrap sub-primitives; we don't want them inlined into callers
22 ! since their behavior depends on what frames are on the callstack
23 : context ( -- context )
24     CONTEXT-OBJ-CONTEXT context-object ; inline
25
26 : set-context ( obj context -- obj' )
27     (set-context) ; inline
28
29 : start-context ( obj quot: ( obj -- * ) -- obj' )
30     (start-context) ; inline
31
32 : set-context-and-delete ( obj context -- * )
33     (set-context-and-delete) ; inline
34
35 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
36     (start-context-and-delete) ; inline
37
38 ! Context introspection
39 : namestack-for ( context -- namestack )
40     [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
41
42 : catchstack-for ( context -- catchstack )
43     [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
44
45 : continuation-for ( context -- continuation )
46     {
47         [ datastack-for ]
48         [ callstack-for ]
49         [ retainstack-for ]
50         [ namestack-for ]
51         [ catchstack-for ]
52     } cleave <continuation> ;
53
54 PRIVATE>
55
56 SYMBOL: initial-thread
57
58 TUPLE: thread
59 { name string }
60 { quot callable initial: [ ] }
61 { exit-handler callable initial: [ ] }
62 { id integer }
63 { context box }
64 state
65 runnable
66 mailbox
67 { variables hashtable }
68 sleep-entry ;
69
70 : self ( -- thread )
71     OBJ-CURRENT-THREAD special-object { thread } declare ; inline
72
73 : thread-continuation ( thread -- continuation )
74     context>> check-box value>> continuation-for ;
75
76 ! Thread-local storage
77 : tnamespace ( -- assoc )
78     self variables>> ; inline
79
80 : tget ( key -- value )
81     tnamespace at ;
82
83 : tset ( value key -- )
84     tnamespace set-at ;
85
86 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
87     [ tnamespace ] dip change-at ; inline
88
89 : threads ( -- assoc )
90     OBJ-THREADS special-object { hashtable } declare ; inline
91
92 : thread-registered? ( thread -- ? )
93     id>> threads key? ;
94
95 <PRIVATE
96
97 : register-thread ( thread -- )
98     dup id>> threads set-at ;
99
100 : unregister-thread ( thread -- )
101     id>> threads delete-at ;
102
103 : set-self ( thread -- )
104     OBJ-CURRENT-THREAD set-special-object ; inline
105
106 PRIVATE>
107
108 : run-queue ( -- dlist )
109     OBJ-RUN-QUEUE special-object { dlist } declare ; inline
110
111 : sleep-queue ( -- heap )
112     OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
113
114 : waiting-callbacks ( -- assoc )
115     OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
116
117 : new-thread ( quot name class -- thread )
118     new
119         swap >>name
120         swap >>quot
121         \ thread counter >>id
122         H{ } clone >>variables
123         <box> >>context ; inline
124
125 : <thread> ( quot name -- thread )
126     \ thread new-thread ;
127
128 : resume ( thread -- )
129     f >>state run-queue push-front ;
130
131 : resume-now ( thread -- )
132     f >>state run-queue push-back ;
133
134 : resume-with ( obj thread -- )
135     f >>state 2array run-queue push-front ;
136
137 : sleep-time ( -- nanos/f )
138     {
139         { [ current-callback waiting-callbacks key? ] [ 0 ] }
140         { [ run-queue deque-empty? not ] [ 0 ] }
141         { [ sleep-queue heap-empty? ] [ f ] }
142         [ sleep-queue heap-peek nip nano-count [-] ]
143     } cond ;
144
145 : interrupt ( thread -- )
146     dup state>> [
147         [
148             [ sleep-queue heap-delete ] when* f
149         ] change-sleep-entry dup resume
150     ] when drop ;
151
152 DEFER: stop
153
154 <PRIVATE
155
156 : schedule-sleep ( thread dt -- )
157     dupd sleep-queue heap-push* >>sleep-entry drop ;
158
159 : expire-sleep? ( -- ? )
160     sleep-queue dup heap-empty?
161     [ drop f ] [ heap-peek nip nano-count <= ] if ;
162
163 : expire-sleep ( thread -- )
164     f >>sleep-entry resume ;
165
166 : expire-sleep-loop ( -- )
167     [ expire-sleep? ]
168     [ sleep-queue heap-pop drop expire-sleep ]
169     while ;
170
171 CONSTANT: [start]
172     [
173         set-namestack
174         init-catchstack
175         self quot>> call
176         stop
177     ]
178
179 GENERIC: (next) ( obj thread -- obj' )
180
181 M: thread (next)
182     dup runnable>>
183     [ context>> box> set-context ]
184     [ t >>runnable drop [start] start-context ] if ;
185
186 : (stop) ( obj thread -- * )
187     dup runnable>>
188     [ context>> box> set-context-and-delete ]
189     [ t >>runnable drop [start] start-context-and-delete ] if ;
190
191 : wake-up-callbacks ( -- )
192     current-callback waiting-callbacks delete-at*
193     [ resume-now ] [ drop ] if ;
194
195 : next ( -- obj thread )
196     expire-sleep-loop
197     wake-up-callbacks
198     run-queue pop-back
199     dup array? [ first2 ] [ [ f ] dip ] if
200     f >>state
201     dup set-self ;
202
203 PRIVATE>
204
205 : stop ( -- * )
206     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
207     next (stop) ;
208
209 : suspend ( state -- obj )
210     [ self ] dip >>state
211     [ context ] dip context>> >box
212     next (next) ;
213
214 : yield ( -- )
215     self resume f suspend drop ;
216
217 GENERIC: sleep-until ( n/f -- )
218
219 M: integer sleep-until
220     [ self ] dip schedule-sleep "sleep" suspend drop ;
221
222 M: f sleep-until
223     drop "standby" suspend drop ;
224
225 GENERIC: sleep ( dt -- )
226
227 M: real sleep
228     >integer nano-count + sleep-until ;
229
230 : (spawn) ( thread -- )
231     [ register-thread ] [ [ get-namestack ] dip resume-with ] bi ;
232
233 : spawn ( quot name -- thread )
234     <thread> [ (spawn) ] keep ;
235
236 : spawn-server ( quot name -- thread )
237     [ '[ _ loop ] ] dip spawn ;
238
239 : in-thread ( quot -- )
240     [ get-datastack ] dip
241     '[ _ set-datastack @ ]
242     "Thread" spawn drop ;
243
244 <PRIVATE
245
246 : init-thread-state ( -- )
247     H{ } clone OBJ-THREADS set-special-object
248     <dlist> OBJ-RUN-QUEUE set-special-object
249     <min-heap> OBJ-SLEEP-QUEUE set-special-object
250     H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
251
252 : init-initial-thread ( -- )
253     [ ] "Initial" <thread>
254     t >>runnable
255     [ initial-thread set-global ]
256     [ register-thread ]
257     [ set-self ]
258     tri ;
259
260 : init-threads ( -- )
261     init-thread-state
262     init-initial-thread ;
263
264 : wait-for-callback ( callback -- )
265     self swap waiting-callbacks set-at
266     "Callback return" suspend drop ;
267
268 PRIVATE>
269
270 [ init-threads ] "threads" add-startup-hook