]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
primitives: Change PRIMITIVE: to check that the word is in that vocabulary and the...
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors alien.private arrays assocs boxes combinators
5 continuations continuations.private deques dlists fry hashtables
6 heaps init kernel kernel.private math math.order namespaces
7 quotations sequences strings system ;
8 FROM: assocs => change-at ;
9 IN: threads
10
11 <PRIVATE
12 PRIMITIVE: (set-context) ( obj context -- obj' )
13 PRIMITIVE: (set-context-and-delete) ( obj context -- * )
14 PRIMITIVE: (sleep) ( nanos -- )
15 PRIMITIVE: (start-context) ( obj quot -- obj' )
16 PRIMITIVE: (start-context-and-delete) ( obj quot -- * )
17 PRIMITIVE: callstack-for ( context -- array )
18 PRIMITIVE: context-object-for ( n context -- obj )
19 PRIMITIVE: datastack-for ( context -- array )
20 PRIMITIVE: retainstack-for ( context -- array )
21
22 ! Wrap sub-primitives; we don't want them inlined into callers
23 ! since their behavior depends on what frames are on the callstack
24 : context ( -- context )
25     CONTEXT-OBJ-CONTEXT context-object ; inline
26
27 : set-context ( obj context -- obj' )
28     (set-context) ; inline
29
30 : start-context ( obj quot: ( obj -- * ) -- obj' )
31     (start-context) ; inline
32
33 : set-context-and-delete ( obj context -- * )
34     (set-context-and-delete) ; inline
35
36 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
37     (start-context-and-delete) ; inline
38
39 ! Context introspection
40 : namestack-for ( context -- namestack )
41     [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
42
43 : catchstack-for ( context -- catchstack )
44     [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
45
46 : continuation-for ( context -- continuation )
47     {
48         [ datastack-for ]
49         [ callstack-for ]
50         [ retainstack-for ]
51         [ namestack-for ]
52         [ catchstack-for ]
53     } cleave <continuation> ;
54
55 PRIVATE>
56
57 SYMBOL: initial-thread
58
59 TUPLE: thread
60 { name string }
61 { quot callable initial: [ ] }
62 { exit-handler callable initial: [ ] }
63 { id integer }
64 { context box }
65 state
66 runnable
67 mailbox
68 { variables hashtable }
69 sleep-entry ;
70
71 : self ( -- thread )
72     OBJ-CURRENT-THREAD special-object { thread } declare ; inline
73
74 : thread-continuation ( thread -- continuation )
75     context>> check-box value>> continuation-for ;
76
77 ! Thread-local storage
78 : tnamespace ( -- assoc )
79     self variables>> ; inline
80
81 : tget ( key -- value )
82     tnamespace at ;
83
84 : tset ( value key -- )
85     tnamespace set-at ;
86
87 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
88     [ tnamespace ] dip change-at ; inline
89
90 : threads ( -- assoc )
91     OBJ-THREADS special-object { hashtable } declare ; inline
92
93 : thread-registered? ( thread -- ? )
94     id>> threads key? ;
95
96 <PRIVATE
97
98 : register-thread ( thread -- )
99     dup id>> threads set-at ;
100
101 : unregister-thread ( thread -- )
102     id>> threads delete-at ;
103
104 : set-self ( thread -- )
105     OBJ-CURRENT-THREAD set-special-object ; inline
106
107 PRIVATE>
108
109 : run-queue ( -- dlist )
110     OBJ-RUN-QUEUE special-object { dlist } declare ; inline
111
112 : sleep-queue ( -- heap )
113     OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
114
115 : waiting-callbacks ( -- assoc )
116     OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
117
118 : new-thread ( quot name class -- thread )
119     new
120         swap >>name
121         swap >>quot
122         \ thread counter >>id
123         H{ } clone >>variables
124         <box> >>context ; inline
125
126 : <thread> ( quot name -- thread )
127     \ thread new-thread ;
128
129 : resume ( thread -- )
130     f >>state run-queue push-front ;
131
132 : resume-now ( thread -- )
133     f >>state run-queue push-back ;
134
135 : resume-with ( obj thread -- )
136     f >>state 2array run-queue push-front ;
137
138 : sleep-time ( -- nanos/f )
139     {
140         { [ current-callback waiting-callbacks key? ] [ 0 ] }
141         { [ run-queue deque-empty? not ] [ 0 ] }
142         { [ sleep-queue heap-empty? ] [ f ] }
143         [ sleep-queue heap-peek nip nano-count [-] ]
144     } cond ;
145
146 : interrupt ( thread -- )
147     dup state>> [
148         [
149             [ sleep-queue heap-delete ] when* f
150         ] change-sleep-entry dup resume
151     ] when drop ;
152
153 DEFER: stop
154
155 <PRIVATE
156
157 : schedule-sleep ( thread dt -- )
158     dupd sleep-queue heap-push* >>sleep-entry drop ;
159
160 : expire-sleep? ( -- ? )
161     sleep-queue dup heap-empty?
162     [ drop f ] [ heap-peek nip nano-count <= ] if ;
163
164 : expire-sleep ( thread -- )
165     f >>sleep-entry resume ;
166
167 : expire-sleep-loop ( -- )
168     [ expire-sleep? ]
169     [ sleep-queue heap-pop drop expire-sleep ]
170     while ;
171
172 CONSTANT: [start]
173     [
174         set-namestack
175         init-catchstack
176         self quot>> call
177         stop
178     ]
179
180 GENERIC: (next) ( obj thread -- obj' )
181
182 M: thread (next)
183     dup runnable>>
184     [ context>> box> set-context ]
185     [ t >>runnable drop [start] start-context ] if ;
186
187 : (stop) ( obj thread -- * )
188     dup runnable>>
189     [ context>> box> set-context-and-delete ]
190     [ t >>runnable drop [start] start-context-and-delete ] if ;
191
192 : wake-up-callbacks ( -- )
193     current-callback waiting-callbacks delete-at*
194     [ resume-now ] [ drop ] if ;
195
196 : next ( -- obj thread )
197     expire-sleep-loop
198     wake-up-callbacks
199     run-queue pop-back
200     dup array? [ first2 ] [ [ f ] dip ] if
201     f >>state
202     dup set-self ;
203
204 PRIVATE>
205
206 : stop ( -- * )
207     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
208     next (stop) ;
209
210 : suspend ( state -- obj )
211     [ self ] dip >>state
212     [ context ] dip context>> >box
213     next (next) ;
214
215 : yield ( -- )
216     self resume f suspend drop ;
217
218 GENERIC: sleep-until ( n/f -- )
219
220 M: integer sleep-until
221     [ self ] dip schedule-sleep "sleep" suspend drop ;
222
223 M: f sleep-until
224     drop "standby" suspend drop ;
225
226 GENERIC: sleep ( dt -- )
227
228 M: real sleep
229     >integer nano-count + sleep-until ;
230
231 : (spawn) ( thread -- )
232     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
233
234 : spawn ( quot name -- thread )
235     <thread> [ (spawn) ] keep ;
236
237 : spawn-server ( quot name -- thread )
238     [ '[ _ loop ] ] dip spawn ;
239
240 : in-thread ( quot -- )
241     [ datastack ] dip
242     '[ _ set-datastack @ ]
243     "Thread" spawn drop ;
244
245 <PRIVATE
246
247 : init-thread-state ( -- )
248     H{ } clone OBJ-THREADS set-special-object
249     <dlist> OBJ-RUN-QUEUE set-special-object
250     <min-heap> OBJ-SLEEP-QUEUE set-special-object
251     H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
252
253 : init-initial-thread ( -- )
254     [ ] "Initial" <thread>
255     t >>runnable
256     [ initial-thread set-global ]
257     [ register-thread ]
258     [ set-self ]
259     tri ;
260
261 : init-threads ( -- )
262     init-thread-state
263     init-initial-thread ;
264
265 : wait-for-callback ( callback -- )
266     self swap waiting-callbacks set-at
267     "Callback return" suspend drop ;
268
269 PRIVATE>
270
271 [ init-threads ] "threads" add-startup-hook