]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
Merge branch 'mongodb-changes' of git://github.com/x6j8x/factor
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2009 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators combinators.private init boxes
7 accessors math.order deques strings quotations fry ;
8 IN: threads
9
10 SYMBOL: initial-thread
11
12 TUPLE: thread
13 { name string }
14 { quot callable initial: [ ] }
15 { exit-handler callable initial: [ ] }
16 { id integer }
17 continuation
18 state
19 runnable
20 mailbox
21 variables
22 sleep-entry ;
23
24 : self ( -- thread ) 63 special-object ; inline
25
26 ! Thread-local storage
27 : tnamespace ( -- assoc )
28     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
29
30 : tget ( key -- value )
31     self variables>> at ;
32
33 : tset ( value key -- )
34     tnamespace set-at ;
35
36 : tchange ( key quot -- )
37     tnamespace swap change-at ; inline
38
39 : threads ( -- assoc ) 64 special-object ;
40
41 : thread ( id -- thread ) threads at ;
42
43 : thread-registered? ( thread -- ? )
44     id>> threads key? ;
45
46 ERROR: already-stopped thread ;
47
48 : check-unregistered ( thread -- thread )
49     dup thread-registered? [ already-stopped ] when ;
50
51 ERROR: not-running thread ;
52
53 : check-registered ( thread -- thread )
54     dup thread-registered? [ not-running ] unless ;
55
56 <PRIVATE
57
58 : register-thread ( thread -- )
59     check-unregistered dup id>> threads set-at ;
60
61 : unregister-thread ( thread -- )
62     check-registered id>> threads delete-at ;
63
64 : set-self ( thread -- ) 63 set-special-object ; inline
65
66 PRIVATE>
67
68 : new-thread ( quot name class -- thread )
69     new
70         swap >>name
71         swap >>quot
72         \ thread counter >>id
73         <box> >>continuation ; inline
74
75 : <thread> ( quot name -- thread )
76     \ thread new-thread ;
77
78 : run-queue ( -- dlist ) 65 special-object ;
79
80 : sleep-queue ( -- heap ) 66 special-object ;
81
82 : resume ( thread -- )
83     f >>state
84     check-registered run-queue push-front ;
85
86 : resume-now ( thread -- )
87     f >>state
88     check-registered run-queue push-back ;
89
90 : resume-with ( obj thread -- )
91     f >>state
92     check-registered 2array run-queue push-front ;
93
94 : sleep-time ( -- nanos/f )
95     {
96         { [ run-queue deque-empty? not ] [ 0 ] }
97         { [ sleep-queue heap-empty? ] [ f ] }
98         [ sleep-queue heap-peek nip nano-count [-] ]
99     } cond ;
100
101 DEFER: stop
102
103 <PRIVATE
104
105 : schedule-sleep ( thread dt -- )
106     [ check-registered dup ] dip sleep-queue heap-push*
107     >>sleep-entry drop ;
108
109 : expire-sleep? ( heap -- ? )
110     dup heap-empty?
111     [ drop f ] [ heap-peek nip nano-count <= ] if ;
112
113 : expire-sleep ( thread -- )
114     f >>sleep-entry resume ;
115
116 : expire-sleep-loop ( -- )
117     sleep-queue
118     [ dup expire-sleep? ]
119     [ dup heap-pop drop expire-sleep ]
120     while
121     drop ;
122
123 : start ( namestack thread -- * )
124     [
125         set-self
126         set-namestack
127         V{ } set-catchstack
128         { } set-retainstack
129         { } set-datastack
130         self quot>> [ call stop ] call-clear
131     ] (( namestack thread -- * )) call-effect-unsafe ;
132
133 DEFER: next
134
135 : no-runnable-threads ( -- * )
136     ! We should never be in a state where the only threads
137     ! are sleeping; the I/O wait thread is always runnable.
138     ! However, if it dies, we handle this case
139     ! semi-gracefully.
140     !
141     ! And if sleep-time outputs f, there are no sleeping
142     ! threads either... so WTF.
143     sleep-time {
144         { [ dup not ] [ drop die ] }
145         { [ dup 0 = ] [ drop ] }
146         [ (sleep) ]
147     } cond next ;
148
149 : (next) ( arg thread -- * )
150     f >>state
151     dup set-self
152     dup runnable>> [
153         continuation>> box> continue-with
154     ] [
155         t >>runnable start
156     ] if ;
157
158 : next ( -- * )
159     expire-sleep-loop
160     run-queue dup deque-empty? [
161         drop no-runnable-threads
162     ] [
163         pop-back dup array? [ first2 ] [ f swap ] if (next)
164     ] if ;
165
166 PRIVATE>
167
168 : stop ( -- )
169     self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi next ;
170
171 : suspend ( quot state -- obj )
172     [
173         [ [ self swap call ] dip self (>>state) ] dip
174         self continuation>> >box
175         next
176     ] callcc1 2nip ; inline
177
178 : yield ( -- ) [ resume ] f suspend drop ;
179
180 GENERIC: sleep-until ( n/f -- )
181
182 M: integer sleep-until
183     '[ _ schedule-sleep ] "sleep" suspend drop ;
184
185 M: f sleep-until
186     drop [ drop ] "interrupt" suspend drop ;
187
188 GENERIC: sleep ( dt -- )
189
190 M: real sleep
191     >integer nano-count + sleep-until ;
192
193 : interrupt ( thread -- )
194     dup state>> [
195         dup sleep-entry>> [ sleep-queue heap-delete ] when*
196         f >>sleep-entry
197         dup resume
198     ] when drop ;
199
200 : (spawn) ( thread -- )
201     [ register-thread ] [ namestack swap resume-with ] bi ;
202
203 : spawn ( quot name -- thread )
204     <thread> [ (spawn) ] keep ;
205
206 : spawn-server ( quot name -- thread )
207     [ '[ _ loop ] ] dip spawn ;
208
209 : in-thread ( quot -- )
210     [ datastack ] dip
211     '[ _ set-datastack _ call ]
212     "Thread" spawn drop ;
213
214 GENERIC: error-in-thread ( error thread -- )
215
216 <PRIVATE
217
218 : init-threads ( -- )
219     H{ } clone 64 set-special-object
220     <dlist> 65 set-special-object
221     <min-heap> 66 set-special-object
222     initial-thread global
223     [ drop [ ] "Initial" <thread> ] cache
224     <box> >>continuation
225     t >>runnable
226     f >>state
227     dup register-thread
228     set-self ;
229
230 PRIVATE>
231
232 [ init-threads ] "threads" add-startup-hook