]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
concurrency.mailboxes: linked-thread's error reporting should still work even when...
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: alien.private arrays hashtables heaps kernel kernel.private
5 math namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors math.order
7 deques strings quotations fry ;
8 FROM: assocs => change-at ;
9 IN: threads
10
11 <PRIVATE
12
13 ! Wrap sub-primitives; we don't want them inlined into callers
14 ! since their behavior depends on what frames are on the callstack
15 : context ( -- context )
16     2 context-object ; inline
17
18 : set-context ( obj context -- obj' )
19     (set-context) ; inline
20
21 : start-context ( obj quot: ( obj -- * ) -- obj' )
22     (start-context) ; inline
23
24 : set-context-and-delete ( obj context -- * )
25     (set-context-and-delete) ; inline
26
27 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
28     (start-context-and-delete) ; inline
29
30 ! Context introspection
31 : namestack-for ( context -- namestack )
32     [ 0 ] dip context-object-for ;
33
34 : catchstack-for ( context -- catchstack )
35     [ 1 ] dip context-object-for ;
36
37 : continuation-for ( context -- continuation )
38     {
39         [ datastack-for ]
40         [ callstack-for ]
41         [ retainstack-for ]
42         [ namestack-for ]
43         [ catchstack-for ]
44     } cleave <continuation> ;
45
46 PRIVATE>
47
48 SYMBOL: initial-thread
49
50 TUPLE: thread
51 { name string }
52 { quot callable initial: [ ] }
53 { exit-handler callable initial: [ ] }
54 { id integer }
55 { context box }
56 state
57 runnable
58 mailbox
59 { variables hashtable }
60 sleep-entry ;
61
62 : self ( -- thread )
63     63 special-object { thread } declare ; inline
64
65 : thread-continuation ( thread -- continuation )
66     context>> check-box value>> continuation-for ;
67
68 ! Thread-local storage
69 : tnamespace ( -- assoc )
70     self variables>> ; inline
71
72 : tget ( key -- value )
73     tnamespace at ;
74
75 : tset ( value key -- )
76     tnamespace set-at ;
77
78 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
79     [ tnamespace ] dip change-at ; inline
80
81 : threads ( -- assoc )
82     64 special-object { hashtable } declare ; inline
83
84 : thread-registered? ( thread -- ? )
85     id>> threads key? ;
86
87 <PRIVATE
88
89 : register-thread ( thread -- )
90     dup id>> threads set-at ;
91
92 : unregister-thread ( thread -- )
93     id>> threads delete-at ;
94
95 : set-self ( thread -- ) 63 set-special-object ; inline
96
97 PRIVATE>
98
99 : run-queue ( -- dlist )
100     65 special-object { dlist } declare ; inline
101
102 : sleep-queue ( -- heap )
103     66 special-object { min-heap } declare ; inline
104
105 : waiting-callbacks ( -- assoc )
106     68 special-object { hashtable } declare ; inline
107
108 : new-thread ( quot name class -- thread )
109     new
110         swap >>name
111         swap >>quot
112         \ thread counter >>id
113         H{ } clone >>variables
114         <box> >>context ; inline
115
116 : <thread> ( quot name -- thread )
117     \ thread new-thread ;
118
119 : resume ( thread -- )
120     f >>state run-queue push-front ;
121
122 : resume-now ( thread -- )
123     f >>state run-queue push-back ;
124
125 : resume-with ( obj thread -- )
126     f >>state 2array run-queue push-front ;
127
128 : sleep-time ( -- nanos/f )
129     {
130         { [ current-callback waiting-callbacks key? ] [ 0 ] }
131         { [ run-queue deque-empty? not ] [ 0 ] }
132         { [ sleep-queue heap-empty? ] [ f ] }
133         [ sleep-queue heap-peek nip nano-count [-] ]
134     } cond ;
135
136 : interrupt ( thread -- )
137     dup state>> [
138         dup sleep-entry>> [ sleep-queue heap-delete ] when*
139         f >>sleep-entry
140         dup resume
141     ] when drop ;
142
143 DEFER: stop
144
145 <PRIVATE
146
147 : schedule-sleep ( thread dt -- )
148     dupd sleep-queue heap-push* >>sleep-entry drop ;
149
150 : expire-sleep? ( -- ? )
151     sleep-queue dup heap-empty?
152     [ drop f ] [ heap-peek nip nano-count <= ] if ;
153
154 : expire-sleep ( thread -- )
155     f >>sleep-entry resume ;
156
157 : expire-sleep-loop ( -- )
158     [ expire-sleep? ]
159     [ sleep-queue heap-pop drop expire-sleep ]
160     while ;
161
162 CONSTANT: [start]
163     [
164         set-namestack
165         init-catchstack
166         self quot>> call
167         stop
168     ]
169
170 : no-runnable-threads ( -- ) die ;
171
172 GENERIC: (next) ( obj thread -- obj' )
173
174 M: thread (next)
175     dup runnable>>
176     [ context>> box> set-context ]
177     [ t >>runnable drop [start] start-context ] if ;
178
179 : (stop) ( obj thread -- * )
180     dup runnable>>
181     [ context>> box> set-context-and-delete ]
182     [ t >>runnable drop [start] start-context-and-delete ] if ;
183
184 : wake-up-callbacks ( -- )
185     current-callback waiting-callbacks delete-at*
186     [ resume-now ] [ drop ] if ;
187
188 : next ( -- obj thread )
189     expire-sleep-loop
190     wake-up-callbacks
191     run-queue pop-back
192     dup array? [ first2 ] [ [ f ] dip ] if
193     f >>state
194     dup set-self ;
195
196 PRIVATE>
197
198 : stop ( -- * )
199     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
200     next (stop) ;
201
202 : suspend ( state -- obj )
203     [ self ] dip >>state
204     [ context ] dip context>> >box
205     next (next) ;
206
207 : yield ( -- ) self resume f suspend drop ;
208
209 GENERIC: sleep-until ( n/f -- )
210
211 M: integer sleep-until
212     [ self ] dip schedule-sleep "sleep" suspend drop ;
213
214 M: f sleep-until
215     drop "standby" suspend drop ;
216
217 GENERIC: sleep ( dt -- )
218
219 M: real sleep
220     >integer nano-count + sleep-until ;
221
222 : (spawn) ( thread -- )
223     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
224
225 : spawn ( quot name -- thread )
226     <thread> [ (spawn) ] keep ;
227
228 : spawn-server ( quot name -- thread )
229     [ '[ _ loop ] ] dip spawn ;
230
231 : in-thread ( quot -- )
232     [ datastack ] dip
233     '[ _ set-datastack @ ]
234     "Thread" spawn drop ;
235
236 <PRIVATE
237
238 : init-thread-state ( -- )
239     H{ } clone 64 set-special-object
240     <dlist> 65 set-special-object
241     <min-heap> 66 set-special-object
242     H{ } clone 68 set-special-object ;
243
244 : init-initial-thread ( -- )
245     [ ] "Initial" <thread>
246     t >>runnable
247     [ initial-thread set-global ]
248     [ register-thread ]
249     [ set-self ]
250     tri ;
251
252 : init-threads ( -- )
253     init-thread-state
254     init-initial-thread ;
255
256 : wait-for-callback ( callback -- )
257     self swap waiting-callbacks set-at
258     "Callback return" suspend drop ;
259
260 PRIVATE>
261
262 [ init-threads ] "threads" add-startup-hook