]> gitweb.factorcode.org Git - factor.git/blob - basis/threads/threads.factor
Fix two overzealous [ ] removal in tests
[factor.git] / basis / threads / threads.factor
1 ! Copyright (C) 2004, 2008 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors
7 math.order deques strings quotations fry ;
8 IN: threads
9
10 SYMBOL: initial-thread
11
12 TUPLE: thread
13 { name string }
14 { quot callable initial: [ ] }
15 { exit-handler callable initial: [ ] }
16 { id integer }
17 continuation
18 state
19 runnable
20 mailbox
21 variables
22 sleep-entry ;
23
24 : self ( -- thread ) 63 getenv ; inline
25
26 ! Thread-local storage
27 : tnamespace ( -- assoc )
28     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
29
30 : tget ( key -- value )
31     self variables>> at ;
32
33 : tset ( value key -- )
34     tnamespace set-at ;
35
36 : tchange ( key quot -- )
37     tnamespace swap change-at ; inline
38
39 : threads ( -- assoc ) 64 getenv ;
40
41 : thread ( id -- thread ) threads at ;
42
43 : thread-registered? ( thread -- ? )
44     id>> threads key? ;
45
46 : check-unregistered ( thread -- thread )
47     dup thread-registered?
48     [ "Thread already stopped" throw ] when ;
49
50 : check-registered ( thread -- thread )
51     dup thread-registered?
52     [ "Thread is not running" throw ] unless ;
53
54 <PRIVATE
55
56 : register-thread ( thread -- )
57     check-unregistered dup id>> threads set-at ;
58
59 : unregister-thread ( thread -- )
60     check-registered id>> threads delete-at ;
61
62 : set-self ( thread -- ) 63 setenv ; inline
63
64 PRIVATE>
65
66 : new-thread ( quot name class -- thread )
67     new
68         swap >>name
69         swap >>quot
70         \ thread counter >>id
71         <box> >>continuation ; inline
72
73 : <thread> ( quot name -- thread )
74     \ thread new-thread ;
75
76 : run-queue ( -- dlist ) 65 getenv ;
77
78 : sleep-queue ( -- heap ) 66 getenv ;
79
80 : resume ( thread -- )
81     f >>state
82     check-registered run-queue push-front ;
83
84 : resume-now ( thread -- )
85     f >>state
86     check-registered run-queue push-back ;
87
88 : resume-with ( obj thread -- )
89     f >>state
90     check-registered 2array run-queue push-front ;
91
92 : sleep-time ( -- us/f )
93     {
94         { [ run-queue deque-empty? not ] [ 0 ] }
95         { [ sleep-queue heap-empty? ] [ f ] }
96         [ sleep-queue heap-peek nip micros [-] ]
97     } cond ;
98
99 DEFER: stop
100
101 <PRIVATE
102
103 : schedule-sleep ( thread dt -- )
104     [ check-registered dup ] dip sleep-queue heap-push*
105     >>sleep-entry drop ;
106
107 : expire-sleep? ( heap -- ? )
108     dup heap-empty?
109     [ drop f ] [ heap-peek nip micros <= ] if ;
110
111 : expire-sleep ( thread -- )
112     f >>sleep-entry resume ;
113
114 : expire-sleep-loop ( -- )
115     sleep-queue
116     [ dup expire-sleep? ]
117     [ dup heap-pop drop expire-sleep ]
118     while
119     drop ;
120
121 : start ( namestack thread -- )
122     [
123         set-self
124         set-namestack
125         V{ } set-catchstack
126         { } set-retainstack
127         { } set-datastack
128         self quot>> [ call stop ] call-clear
129     ] 2 (throw) ;
130
131 DEFER: next
132
133 : no-runnable-threads ( -- * )
134     ! We should never be in a state where the only threads
135     ! are sleeping; the I/O wait thread is always runnable.
136     ! However, if it dies, we handle this case
137     ! semi-gracefully.
138     !
139     ! And if sleep-time outputs f, there are no sleeping
140     ! threads either... so WTF.
141     sleep-time [ die 0 ] unless* (sleep) next ;
142
143 : (next) ( arg thread -- * )
144     f >>state
145     dup set-self
146     dup runnable>> [
147         continuation>> box> continue-with
148     ] [
149         t >>runnable start
150     ] if ;
151
152 : next ( -- * )
153     expire-sleep-loop
154     run-queue dup deque-empty? [
155         drop no-runnable-threads
156     ] [
157         pop-back dup array? [ first2 ] [ f swap ] if (next)
158     ] if ;
159
160 PRIVATE>
161
162 : stop ( -- )
163     self [ exit-handler>> call ] [ unregister-thread ] bi next ;
164
165 : suspend ( quot state -- obj )
166     [
167         [ [ self swap call ] dip self (>>state) ] dip
168         self continuation>> >box
169         next
170     ] callcc1 2nip ; inline
171
172 : yield ( -- ) [ resume ] f suspend drop ;
173
174 GENERIC: sleep-until ( time/f -- )
175
176 M: integer sleep-until
177     '[ _ schedule-sleep ] "sleep" suspend drop ;
178
179 M: f sleep-until
180     drop [ drop ] "interrupt" suspend drop ;
181
182 GENERIC: sleep ( dt -- )
183
184 M: real sleep
185     micros + >integer sleep-until ;
186
187 : interrupt ( thread -- )
188     dup state>> [
189         dup sleep-entry>> [ sleep-queue heap-delete ] when*
190         f >>sleep-entry
191         dup resume
192     ] when drop ;
193
194 : (spawn) ( thread -- )
195     [ register-thread ] [ namestack swap resume-with ] bi ;
196
197 : spawn ( quot name -- thread )
198     <thread> [ (spawn) ] keep ;
199
200 : spawn-server ( quot name -- thread )
201     [ '[ _ loop ] ] dip spawn ;
202
203 : in-thread ( quot -- )
204     [ datastack ] dip
205     '[ _ set-datastack _ call ]
206     "Thread" spawn drop ;
207
208 GENERIC: error-in-thread ( error thread -- )
209
210 <PRIVATE
211
212 : init-threads ( -- )
213     H{ } clone 64 setenv
214     <dlist> 65 setenv
215     <min-heap> 66 setenv
216     initial-thread global
217     [ drop [ ] "Initial" <thread> ] cache
218     <box> >>continuation
219     t >>runnable
220     f >>state
221     dup register-thread
222     set-self ;
223
224 PRIVATE>
225
226 [ init-threads ] "threads" add-init-hook