]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
basis: use lint.vocabs tool to trim using lists
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2011 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: accessors alien.private arrays assocs boxes combinators
5 continuations continuations.private deques dlists hashtables
6 heaps kernel kernel.private math math.order namespaces
7 quotations sequences strings system ;
8 IN: threads
9
10 <PRIVATE
11 PRIMITIVE: (set-context) ( obj context -- obj' )
12 PRIMITIVE: (set-context-and-delete) ( obj context -- * )
13 PRIMITIVE: (sleep) ( nanos -- )
14 PRIMITIVE: (start-context) ( obj quot -- obj' )
15 PRIMITIVE: (start-context-and-delete) ( obj quot -- * )
16
17 PRIMITIVE: context-object-for ( n context -- obj )
18
19 ! Wrap sub-primitives; we don't want them inlined into callers
20 ! since their behavior depends on what frames are on the callstack
21 : set-context ( obj context -- obj' )
22     (set-context) ; inline
23
24 : start-context ( obj quot: ( obj -- * ) -- obj' )
25     (start-context) ; inline
26
27 : set-context-and-delete ( obj context -- * )
28     (set-context-and-delete) ; inline
29
30 : start-context-and-delete ( obj quot: ( obj -- * ) -- * )
31     (start-context-and-delete) ; inline
32
33 ! Context introspection
34 : namestack-for ( context -- namestack )
35     [ CONTEXT-OBJ-NAMESTACK ] dip context-object-for ;
36
37 : catchstack-for ( context -- catchstack )
38     [ CONTEXT-OBJ-CATCHSTACK ] dip context-object-for ;
39
40 : continuation-for ( context -- continuation )
41     {
42         [ datastack-for ]
43         [ callstack-for ]
44         [ retainstack-for ]
45         [ namestack-for ]
46         [ catchstack-for ]
47     } cleave <continuation> ;
48
49 PRIVATE>
50
51 SYMBOL: initial-thread
52
53 TUPLE: thread
54     { name string }
55     { quot callable initial: [ ] }
56     { exit-handler callable initial: [ ] }
57     { id integer }
58     { context box }
59     state
60     runnable
61     mailbox
62     { variables hashtable }
63     sleep-entry ;
64
65 : self ( -- thread )
66     OBJ-CURRENT-THREAD special-object { thread } declare ; inline
67
68 : thread-continuation ( thread -- continuation )
69     context>> check-box value>> continuation-for ;
70
71 ! Thread-local storage
72 : tnamespace ( -- assoc )
73     self variables>> ; inline
74
75 : tget ( key -- value )
76     tnamespace at ;
77
78 : tset ( value key -- )
79     tnamespace set-at ;
80
81 : tchange ( ..a key quot: ( ..a value -- ..b newvalue ) -- ..b )
82     [ tnamespace ] dip change-at ; inline
83
84 : threads ( -- assoc )
85     OBJ-THREADS special-object { hashtable } declare ; inline
86
87 : thread-registered? ( thread -- ? )
88     id>> threads key? ;
89
90 <PRIVATE
91
92 : register-thread ( thread -- )
93     dup id>> threads set-at ;
94
95 : unregister-thread ( thread -- )
96     id>> threads delete-at ;
97
98 : set-self ( thread -- )
99     OBJ-CURRENT-THREAD set-special-object ; inline
100
101 PRIVATE>
102
103 : run-queue ( -- dlist )
104     OBJ-RUN-QUEUE special-object { dlist } declare ; inline
105
106 : sleep-queue ( -- heap )
107     OBJ-SLEEP-QUEUE special-object { min-heap } declare ; inline
108
109 : waiting-callbacks ( -- assoc )
110     OBJ-WAITING-CALLBACKS special-object { hashtable } declare ; inline
111
112 : new-thread ( quot name class -- thread )
113     new
114         swap >>name
115         swap >>quot
116         \ thread counter >>id
117         H{ } clone >>variables
118         <box> >>context ; inline
119
120 : <thread> ( quot name -- thread )
121     \ thread new-thread ;
122
123 : resume ( thread -- )
124     f >>state run-queue push-front ;
125
126 : resume-now ( thread -- )
127     f >>state run-queue push-back ;
128
129 : resume-with ( obj thread -- )
130     f >>state 2array run-queue push-front ;
131
132 : sleep-time ( -- nanos/f )
133     {
134         { [ current-callback waiting-callbacks key? ] [ 0 ] }
135         { [ run-queue deque-empty? not ] [ 0 ] }
136         { [ sleep-queue heap-empty? ] [ f ] }
137         [ sleep-queue heap-peek nip nano-count [-] ]
138     } cond ;
139
140 : interrupt ( thread -- )
141     dup state>> [
142         [
143             [ sleep-queue heap-delete ] when* f
144         ] change-sleep-entry dup resume
145     ] when drop ;
146
147 DEFER: stop
148
149 <PRIVATE
150
151 : schedule-sleep ( thread dt -- )
152     dupd sleep-queue heap-push* >>sleep-entry drop ;
153
154 : expire-sleep? ( -- ? )
155     sleep-queue dup heap-empty?
156     [ drop f ] [ heap-peek nip nano-count <= ] if ;
157
158 : expire-sleep ( thread -- )
159     f >>sleep-entry resume ;
160
161 : expire-sleep-loop ( -- )
162     [ expire-sleep? ]
163     [ sleep-queue heap-pop drop expire-sleep ]
164     while ;
165
166 CONSTANT: [start]
167     [
168         set-namestack
169         init-catchstack
170         self quot>> call
171         stop
172     ]
173
174 GENERIC: (next) ( obj thread -- obj' )
175
176 M: thread (next)
177     dup runnable>>
178     [ context>> box> set-context ]
179     [ t >>runnable drop [start] start-context ] if ;
180
181 : (stop) ( obj thread -- * )
182     dup runnable>>
183     [ context>> box> set-context-and-delete ]
184     [ t >>runnable drop [start] start-context-and-delete ] if ;
185
186 : wake-up-callbacks ( -- )
187     current-callback waiting-callbacks delete-at*
188     [ resume-now ] [ drop ] if ;
189
190 : next ( -- obj thread )
191     expire-sleep-loop
192     wake-up-callbacks
193     run-queue pop-back
194     dup array? [ first2 ] [ [ f ] dip ] if
195     f >>state
196     dup set-self ;
197
198 PRIVATE>
199
200 : stop ( -- * )
201     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi
202     next (stop) ;
203
204 : suspend ( state -- obj )
205     [ self ] dip >>state
206     [ context ] dip context>> >box
207     next (next) ;
208
209 : yield ( -- )
210     self resume f suspend drop ;
211
212 GENERIC: sleep-until ( n/f -- )
213
214 M: integer sleep-until
215     [ self ] dip schedule-sleep "sleep" suspend drop ;
216
217 M: f sleep-until
218     drop "standby" suspend drop ;
219
220 GENERIC: sleep ( dt -- )
221
222 M: real sleep
223     >integer nano-count + sleep-until ;
224
225 : (spawn) ( thread -- )
226     [ register-thread ] [ [ get-namestack ] dip resume-with ] bi ;
227
228 : spawn ( quot name -- thread )
229     <thread> [ (spawn) ] keep ;
230
231 : spawn-server ( quot name -- thread )
232     [ '[ _ loop ] ] dip spawn ;
233
234 : in-thread ( quot -- )
235     [ get-datastack ] dip
236     '[ _ set-datastack @ ]
237     "Thread" spawn drop ;
238
239 <PRIVATE
240
241 : init-thread-state ( -- )
242     H{ } clone OBJ-THREADS set-special-object
243     <dlist> OBJ-RUN-QUEUE set-special-object
244     <min-heap> OBJ-SLEEP-QUEUE set-special-object
245     H{ } clone OBJ-WAITING-CALLBACKS set-special-object ;
246
247 : init-initial-thread ( -- )
248     [ ] "Initial" <thread>
249     t >>runnable
250     [ initial-thread set-global ]
251     [ register-thread ]
252     [ set-self ]
253     tri ;
254
255 : init-threads ( -- )
256     init-thread-state
257     init-initial-thread ;
258
259 : wait-for-callback ( callback -- )
260     self swap waiting-callbacks set-at
261     "Callback return" suspend drop ;
262
263 PRIVATE>
264
265 STARTUP-HOOK: init-threads