]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
Merge branch 'master' into conditional
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2010 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators combinators.private init boxes
7 accessors math.order deques strings quotations fry ;
8 IN: threads
9
10 <PRIVATE
11
12 ! (set-context) and (start-context) are sub-primitives, but
13 ! we don't want them inlined into callers since their behavior
14 ! depends on what frames are on the callstack
15 : set-context ( obj context -- obj' ) (set-context) ;
16 : start-context ( obj quot: ( obj -- * ) -- obj' ) (start-context) ;
17
18 PRIVATE>
19
20 SYMBOL: initial-thread
21
22 TUPLE: thread
23 { name string }
24 { quot callable initial: [ ] }
25 { exit-handler callable initial: [ ] }
26 { id integer }
27 { continuation box }
28 state
29 runnable
30 mailbox
31 { variables hashtable }
32 sleep-entry ;
33
34 : self ( -- thread )
35     63 special-object { thread } declare ; inline
36
37 ! Thread-local storage
38 : tnamespace ( -- assoc )
39     self variables>> ; inline
40
41 : tget ( key -- value )
42     tnamespace at ;
43
44 : tset ( value key -- )
45     tnamespace set-at ;
46
47 : tchange ( key quot -- )
48     tnamespace swap change-at ; inline
49
50 : threads ( -- assoc )
51     64 special-object { hashtable } declare ; inline
52
53 : thread ( id -- thread )
54     threads at ;
55
56 : thread-registered? ( thread -- ? )
57     id>> threads key? ;
58
59 ERROR: already-stopped thread ;
60
61 : check-unregistered ( thread -- thread )
62     dup thread-registered? [ already-stopped ] when ;
63
64 ERROR: not-running thread ;
65
66 : check-registered ( thread -- thread )
67     dup thread-registered? [ not-running ] unless ;
68
69 <PRIVATE
70
71 : register-thread ( thread -- )
72     check-unregistered dup id>> threads set-at ;
73
74 : unregister-thread ( thread -- )
75     check-registered id>> threads delete-at ;
76
77 : set-self ( thread -- ) 63 set-special-object ; inline
78
79 PRIVATE>
80
81 : new-thread ( quot name class -- thread )
82     new
83         swap >>name
84         swap >>quot
85         \ thread counter >>id
86         <box> >>continuation
87         H{ } clone >>variables ; inline
88
89 : <thread> ( quot name -- thread )
90     \ thread new-thread ;
91
92 : run-queue ( -- dlist )
93     65 special-object { dlist } declare ; inline
94
95 : sleep-queue ( -- heap )
96     66 special-object { dlist } declare ; inline
97
98 : resume ( thread -- )
99     f >>state
100     check-registered run-queue push-front ;
101
102 : resume-now ( thread -- )
103     f >>state
104     check-registered run-queue push-back ;
105
106 : resume-with ( obj thread -- )
107     f >>state
108     check-registered 2array run-queue push-front ;
109
110 : sleep-time ( -- nanos/f )
111     {
112         { [ run-queue deque-empty? not ] [ 0 ] }
113         { [ sleep-queue heap-empty? ] [ f ] }
114         [ sleep-queue heap-peek nip nano-count [-] ]
115     } cond ;
116
117 DEFER: stop
118
119 <PRIVATE
120
121 : schedule-sleep ( thread dt -- )
122     [ check-registered dup ] dip sleep-queue heap-push*
123     >>sleep-entry drop ;
124
125 : expire-sleep? ( heap -- ? )
126     dup heap-empty?
127     [ drop f ] [ heap-peek nip nano-count <= ] if ;
128
129 : expire-sleep ( thread -- )
130     f >>sleep-entry resume ;
131
132 : expire-sleep-loop ( -- )
133     sleep-queue
134     [ dup expire-sleep? ]
135     [ dup heap-pop drop expire-sleep ]
136     while
137     drop ;
138
139 : start ( namestack thread -- * )
140     [
141         set-self
142         set-namestack
143         V{ } set-catchstack
144         { } set-retainstack
145         { } set-datastack
146         self quot>> [ call stop ] call-clear
147     ] (( namestack thread -- * )) call-effect-unsafe ;
148
149 DEFER: next
150
151 : no-runnable-threads ( -- * )
152     ! We should never be in a state where the only threads
153     ! are sleeping; the I/O wait thread is always runnable.
154     ! However, if it dies, we handle this case
155     ! semi-gracefully.
156     !
157     ! And if sleep-time outputs f, there are no sleeping
158     ! threads either... so WTF.
159     sleep-time {
160         { [ dup not ] [ drop die ] }
161         { [ dup 0 = ] [ drop ] }
162         [ (sleep) ]
163     } cond next ;
164
165 : (next) ( arg thread -- * )
166     f >>state
167     dup set-self
168     dup runnable>> [
169         continuation>> box> continue-with
170     ] [
171         t >>runnable start
172     ] if ;
173
174 : next ( -- * )
175     expire-sleep-loop
176     run-queue dup deque-empty? [
177         drop no-runnable-threads
178     ] [
179         pop-back dup array? [ first2 ] [ f swap ] if (next)
180     ] if ;
181
182 PRIVATE>
183
184 : stop ( -- * )
185     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi next ;
186
187 : suspend ( state -- obj )
188     self (>>state)
189     [ self continuation>> >box next ] callcc1 ; inline
190
191 : yield ( -- ) self resume f suspend drop ;
192
193 GENERIC: sleep-until ( n/f -- )
194
195 M: integer sleep-until
196     [ self ] dip schedule-sleep "sleep" suspend drop ;
197
198 M: f sleep-until
199     drop "interrupt" suspend drop ;
200
201 GENERIC: sleep ( dt -- )
202
203 M: real sleep
204     >integer nano-count + sleep-until ;
205
206 : interrupt ( thread -- )
207     dup state>> [
208         dup sleep-entry>> [ sleep-queue heap-delete ] when*
209         f >>sleep-entry
210         dup resume
211     ] when drop ;
212
213 : (spawn) ( thread -- )
214     [ register-thread ] [ namestack swap resume-with ] bi ;
215
216 : spawn ( quot name -- thread )
217     <thread> [ (spawn) ] keep ;
218
219 : spawn-server ( quot name -- thread )
220     [ '[ _ loop ] ] dip spawn ;
221
222 : in-thread ( quot -- )
223     [ datastack ] dip
224     '[ _ set-datastack @ ]
225     "Thread" spawn drop ;
226
227 GENERIC: error-in-thread ( error thread -- )
228
229 <PRIVATE
230
231 : init-threads ( -- )
232     H{ } clone 64 set-special-object
233     <dlist> 65 set-special-object
234     <min-heap> 66 set-special-object
235     initial-thread global
236     [ drop [ ] "Initial" <thread> ] cache
237     <box> >>continuation
238     t >>runnable
239     f >>state
240     dup register-thread
241     set-self ;
242
243 PRIVATE>
244
245 [ init-threads ] "threads" add-startup-hook