]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
threads: some cleanup.
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors alien.private arrays assocs boxes combinators
5 continuations continuations.private deques dlists fry hashtables
6 heaps init kernel kernel.private math math.order namespaces
7 quotations sequences strings system ;
8 FROM: assocs => change-at ;
9 IN: threads
10
11 <PRIVATE
12
13 ! Wrap sub-primitives; we don't want them inlined into callers
14 ! since their behavior depends on what frames are on the callstack
15 : context ( -- context )
16     CONTEXT-OBJ-CONTEXT context-object ; inline
17
18 : set-context ( obj context -- obj' )
19     (set-context) ; inline
20
21 : start-context ( obj quot: ( obj -- * ) -- obj' )
22     (start-context) ; inline
23
24 : set-context-and-delete ( obj context -- * )
25     (set-context-and-delete) ; inline
26
27 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
28     (start-context-and-delete) ; inline
29
30 ! Context introspection
31 : namestack-for ( context -- namestack )
32     [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
33
34 : catchstack-for ( context -- catchstack )
35     [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
36
37 : continuation-for ( context -- continuation )
38     {
39         [ datastack-for ]
40         [ callstack-for ]
41         [ retainstack-for ]
42         [ namestack-for ]
43         [ catchstack-for ]
44     } cleave <continuation> ;
45
46 PRIVATE>
47
48 SYMBOL: initial-thread
49
50 TUPLE: thread
51 { name string }
52 { quot callable initial: [ ] }
53 { exit-handler callable initial: [ ] }
54 { id integer }
55 { context box }
56 state
57 runnable
58 mailbox
59 { variables hashtable }
60 sleep-entry ;
61
62 : self ( -- thread )
63     OBJ-CURRENT-THREAD special-object { thread } declare ; inline
64
65 : thread-continuation ( thread -- continuation )
66     context>> check-box value>> continuation-for ;
67
68 ! Thread-local storage
69 : tnamespace ( -- assoc )
70     self variables>> ; inline
71
72 : tget ( key -- value )
73     tnamespace at ;
74
75 : tset ( value key -- )
76     tnamespace set-at ;
77
78 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
79     [ tnamespace ] dip change-at ; inline
80
81 : threads ( -- assoc )
82     OBJ-THREADS special-object { hashtable } declare ; inline
83
84 : thread-registered? ( thread -- ? )
85     id>> threads key? ;
86
87 <PRIVATE
88
89 : register-thread ( thread -- )
90     dup id>> threads set-at ;
91
92 : unregister-thread ( thread -- )
93     id>> threads delete-at ;
94
95 : set-self ( thread -- )
96     OBJ-CURRENT-THREAD set-special-object ; inline
97
98 PRIVATE>
99
100 : run-queue ( -- dlist )
101     OBJ-RUN-QUEUE special-object { dlist } declare ; inline
102
103 : sleep-queue ( -- heap )
104     OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
105
106 : waiting-callbacks ( -- assoc )
107     OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
108
109 : new-thread ( quot name class -- thread )
110     new
111         swap >>name
112         swap >>quot
113         \ thread counter >>id
114         H{ } clone >>variables
115         <box> >>context ; inline
116
117 : <thread> ( quot name -- thread )
118     \ thread new-thread ;
119
120 : resume ( thread -- )
121     f >>state run-queue push-front ;
122
123 : resume-now ( thread -- )
124     f >>state run-queue push-back ;
125
126 : resume-with ( obj thread -- )
127     f >>state 2array run-queue push-front ;
128
129 : sleep-time ( -- nanos/f )
130     {
131         { [ current-callback waiting-callbacks key? ] [ 0 ] }
132         { [ run-queue deque-empty? not ] [ 0 ] }
133         { [ sleep-queue heap-empty? ] [ f ] }
134         [ sleep-queue heap-peek nip nano-count [-] ]
135     } cond ;
136
137 : interrupt ( thread -- )
138     dup state>> [
139         [
140             [ sleep-queue heap-delete ] when* f
141         ] change-sleep-entry dup resume
142     ] when drop ;
143
144 DEFER: stop
145
146 <PRIVATE
147
148 : schedule-sleep ( thread dt -- )
149     dupd sleep-queue heap-push* >>sleep-entry drop ;
150
151 : expire-sleep? ( -- ? )
152     sleep-queue dup heap-empty?
153     [ drop f ] [ heap-peek nip nano-count <= ] if ;
154
155 : expire-sleep ( thread -- )
156     f >>sleep-entry resume ;
157
158 : expire-sleep-loop ( -- )
159     [ expire-sleep? ]
160     [ sleep-queue heap-pop drop expire-sleep ]
161     while ;
162
163 CONSTANT: [start]
164     [
165         set-namestack
166         init-catchstack
167         self quot>> call
168         stop
169     ]
170
171 GENERIC: (next) ( obj thread -- obj' )
172
173 M: thread (next)
174     dup runnable>>
175     [ context>> box> set-context ]
176     [ t >>runnable drop [start] start-context ] if ;
177
178 : (stop) ( obj thread -- * )
179     dup runnable>>
180     [ context>> box> set-context-and-delete ]
181     [ t >>runnable drop [start] start-context-and-delete ] if ;
182
183 : wake-up-callbacks ( -- )
184     current-callback waiting-callbacks delete-at*
185     [ resume-now ] [ drop ] if ;
186
187 : next ( -- obj thread )
188     expire-sleep-loop
189     wake-up-callbacks
190     run-queue pop-back
191     dup array? [ first2 ] [ [ f ] dip ] if
192     f >>state
193     dup set-self ;
194
195 PRIVATE>
196
197 : stop ( -- * )
198     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
199     next (stop) ;
200
201 : suspend ( state -- obj )
202     [ self ] dip >>state
203     [ context ] dip context>> >box
204     next (next) ;
205
206 : yield ( -- )
207     self resume f suspend drop ;
208
209 GENERIC: sleep-until ( n/f -- )
210
211 M: integer sleep-until
212     [ self ] dip schedule-sleep "sleep" suspend drop ;
213
214 M: f sleep-until
215     drop "standby" suspend drop ;
216
217 GENERIC: sleep ( dt -- )
218
219 M: real sleep
220     >integer nano-count + sleep-until ;
221
222 : (spawn) ( thread -- )
223     [ register-thread ] [ [ namestack ] dip resume-with ] bi ;
224
225 : spawn ( quot name -- thread )
226     <thread> [ (spawn) ] keep ;
227
228 : spawn-server ( quot name -- thread )
229     [ '[ _ loop ] ] dip spawn ;
230
231 : in-thread ( quot -- )
232     [ datastack ] dip
233     '[ _ set-datastack @ ]
234     "Thread" spawn drop ;
235
236 <PRIVATE
237
238 : init-thread-state ( -- )
239     H{ } clone OBJ-THREADS set-special-object
240     <dlist> OBJ-RUN-QUEUE set-special-object
241     <min-heap> OBJ-SLEEP-QUEUE set-special-object
242     H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
243
244 : init-initial-thread ( -- )
245     [ ] "Initial" <thread>
246     t >>runnable
247     [ initial-thread set-global ]
248     [ register-thread ]
249     [ set-self ]
250     tri ;
251
252 : init-threads ( -- )
253     init-thread-state
254     init-initial-thread ;
255
256 : wait-for-callback ( callback -- )
257     self swap waiting-callbacks set-at
258     "Callback return" suspend drop ;
259
260 PRIVATE>
261
262 [ init-threads ] "threads" add-startup-hook