]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
constants for special object hardcoded literals
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: alien.private arrays hashtables heaps kernel kernel.private
5 math namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors math.order
7 deques strings quotations fry ;
8 FROM: assocs => change-at ;
9 IN: threads
10
11 <PRIVATE
12
13 ! Wrap sub-primitives; we don't want them inlined into callers
14 ! since their behavior depends on what frames are on the callstack
15 : context ( -- context )
16     CONTEXT-OBJ-CONTEXT context-object ; inline
17
18 : set-context ( obj context -- obj' )
19     (set-context) ; inline
20
21 : start-context ( obj quot: ( obj -- * ) -- obj' )
22     (start-context) ; inline
23
24 : set-context-and-delete ( obj context -- * )
25     (set-context-and-delete) ; inline
26
27 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
28     (start-context-and-delete) ; inline
29
30 ! Context introspection
31 : namestack-for ( context -- namestack )
32     [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
33
34 : catchstack-for ( context -- catchstack )
35     [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
36
37 : continuation-for ( context -- continuation )
38     {
39         [ datastack-for ]
40         [ callstack-for ]
41         [ retainstack-for ]
42         [ namestack-for ]
43         [ catchstack-for ]
44     } cleave <continuation> ;
45
46 PRIVATE>
47
48 SYMBOL: initial-thread
49
50 TUPLE: thread
51 { name string }
52 { quot callable initial: [ ] }
53 { exit-handler callable initial: [ ] }
54 { id integer }
55 { context box }
56 state
57 runnable
58 mailbox
59 { variables hashtable }
60 sleep-entry ;
61
62 : self ( -- thread )
63     OBJ-CURRENT-THREAD special-object { thread } declare ; inline
64
65 : thread-continuation ( thread -- continuation )
66     context>> check-box value>> continuation-for ;
67
68 ! Thread-local storage
69 : tnamespace ( -- assoc )
70     self variables>> ; inline
71
72 : tget ( key -- value )
73     tnamespace at ;
74
75 : tset ( value key -- )
76     tnamespace set-at ;
77
78 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
79     [ tnamespace ] dip change-at ; inline
80
81 : threads ( -- assoc )
82     OBJ-THREADS special-object { hashtable } declare ; inline
83
84 : thread-registered? ( thread -- ? )
85     id>> threads key? ;
86
87 <PRIVATE
88
89 : register-thread ( thread -- )
90     dup id>> threads set-at ;
91
92 : unregister-thread ( thread -- )
93     id>> threads delete-at ;
94
95 : set-self ( thread -- ) OBJ-CURRENT-THREAD set-special-object ; inline
96
97 PRIVATE>
98
99 : run-queue ( -- dlist )
100     OBJ-RUN-QUEUE special-object { dlist } declare ; inline
101
102 : sleep-queue ( -- heap )
103     OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
104
105 : waiting-callbacks ( -- assoc )
106     OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
107
108 : new-thread ( quot name class -- thread )
109     new
110         swap >>name
111         swap >>quot
112         \ thread counter >>id
113         H{ } clone >>variables
114         <box> >>context ; inline
115
116 : <thread> ( quot name -- thread )
117     \ thread new-thread ;
118
119 : resume ( thread -- )
120     f >>state run-queue push-front ;
121
122 : resume-now ( thread -- )
123     f >>state run-queue push-back ;
124
125 : resume-with ( obj thread -- )
126     f >>state 2array run-queue push-front ;
127
128 : sleep-time ( -- nanos/f )
129     {
130         { [ current-callback waiting-callbacks key? ] [ 0 ] }
131         { [ run-queue deque-empty? not ] [ 0 ] }
132         { [ sleep-queue heap-empty? ] [ f ] }
133         [ sleep-queue heap-peek nip nano-count [-] ]
134     } cond ;
135
136 : interrupt ( thread -- )
137     dup state>> [
138         dup sleep-entry>> [ sleep-queue heap-delete ] when*
139         f >>sleep-entry
140         dup resume
141     ] when drop ;
142
143 DEFER: stop
144
145 <PRIVATE
146
147 : schedule-sleep ( thread dt -- )
148     dupd sleep-queue heap-push* >>sleep-entry drop ;
149
150 : expire-sleep? ( -- ? )
151     sleep-queue dup heap-empty?
152     [ drop f ] [ heap-peek nip nano-count <= ] if ;
153
154 : expire-sleep ( thread -- )
155     f >>sleep-entry resume ;
156
157 : expire-sleep-loop ( -- )
158     [ expire-sleep? ]
159     [ sleep-queue heap-pop drop expire-sleep ]
160     while ;
161
162 CONSTANT: [start]
163     [
164         set-namestack
165         init-catchstack
166         self quot>> call
167         stop
168     ]
169
170 GENERIC: (next) ( obj thread -- obj' )
171
172 M: thread (next)
173     dup runnable>>
174     [ context>> box> set-context ]
175     [ t >>runnable drop [start] start-context ] if ;
176
177 : (stop) ( obj thread -- * )
178     dup runnable>>
179     [ context>> box> set-context-and-delete ]
180     [ t >>runnable drop [start] start-context-and-delete ] if ;
181
182 : wake-up-callbacks ( -- )
183     current-callback waiting-callbacks delete-at*
184     [ resume-now ] [ drop ] if ;
185
186 : next ( -- obj thread )
187     expire-sleep-loop
188     wake-up-callbacks
189     run-queue pop-back
190     dup array? [ first2 ] [ [ f ] dip ] if
191     f >>state
192     dup set-self ;
193
194 PRIVATE>
195
196 : stop ( -- * )
197     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
198     next (stop) ;
199
200 : suspend ( state -- obj )
201     [ self ] dip >>state
202     [ context ] dip context>> >box
203     next (next) ;
204
205 : yield ( -- ) self resume f suspend drop ;
206
207 GENERIC: sleep-until ( n/f -- )
208
209 M: integer sleep-until
210     [ self ] dip schedule-sleep "sleep" suspend drop ;
211
212 M: f sleep-until
213     drop "standby" suspend drop ;
214
215 GENERIC: sleep ( dt -- )
216
217 M: real sleep
218     >integer nano-count + sleep-until ;
219
220 : (spawn) ( thread -- )
221     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
222
223 : spawn ( quot name -- thread )
224     <thread> [ (spawn) ] keep ;
225
226 : spawn-server ( quot name -- thread )
227     [ '[ _ loop ] ] dip spawn ;
228
229 : in-thread ( quot -- )
230     [ datastack ] dip
231     '[ _ set-datastack @ ]
232     "Thread" spawn drop ;
233
234 <PRIVATE
235
236 : init-thread-state ( -- )
237     H{ } clone OBJ-THREADS set-special-object
238     <dlist> OBJ-RUN-QUEUE set-special-object
239     <min-heap> OBJ-SLEEP-QUEUE set-special-object
240     H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
241
242 : init-initial-thread ( -- )
243     [ ] "Initial" <thread>
244     t >>runnable
245     [ initial-thread set-global ]
246     [ register-thread ]
247     [ set-self ]
248     tri ;
249
250 : init-threads ( -- )
251     init-thread-state
252     init-initial-thread ;
253
254 : wait-for-callback ( callback -- )
255     self swap waiting-callbacks set-at
256     "Callback return" suspend drop ;
257
258 PRIVATE>
259
260 [ init-threads ] "threads" add-startup-hook