]> gitweb.factorcode.org Git - factor.git/blob - core/threads/threads.factor
Merge branch 'master' of git://factorcode.org/git/factor
[factor.git] / core / threads / threads.factor
1 ! Copyright (C) 2004, 2008 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors
7 math.order dequeues ;
8 IN: threads
9
10 SYMBOL: initial-thread
11
12 TUPLE: thread
13 name quot exit-handler
14 id
15 continuation state runnable
16 mailbox variables sleep-entry ;
17
18 : self ( -- thread ) 63 getenv ; inline
19
20 ! Thread-local storage
21 : tnamespace ( -- assoc )
22     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
23
24 : tget ( key -- value )
25     self variables>> at ;
26
27 : tset ( value key -- )
28     tnamespace set-at ;
29
30 : tchange ( key quot -- )
31     tnamespace swap change-at ; inline
32
33 : threads 64 getenv ;
34
35 : thread ( id -- thread ) threads at ;
36
37 : thread-registered? ( thread -- ? )
38     id>> threads key? ;
39
40 : check-unregistered ( thread -- thread )
41     dup thread-registered?
42     [ "Thread already stopped" throw ] when ;
43
44 : check-registered ( thread -- thread )
45     dup thread-registered?
46     [ "Thread is not running" throw ] unless ;
47
48 <PRIVATE
49
50 : register-thread ( thread -- )
51     check-unregistered dup id>> threads set-at ;
52
53 : unregister-thread ( thread -- )
54     check-registered id>> threads delete-at ;
55
56 : set-self ( thread -- ) 63 setenv ; inline
57
58 PRIVATE>
59
60 : new-thread ( quot name class -- thread )
61     new
62         swap >>name
63         swap >>quot
64         \ thread counter >>id
65         <box> >>continuation
66         [ ] >>exit-handler ; inline
67
68 : <thread> ( quot name -- thread )
69     \ thread new-thread ;
70
71 : run-queue 65 getenv ;
72
73 : sleep-queue 66 getenv ;
74
75 : resume ( thread -- )
76     f >>state
77     check-registered run-queue push-front ;
78
79 : resume-now ( thread -- )
80     f >>state
81     check-registered run-queue push-back ;
82
83 : resume-with ( obj thread -- )
84     f >>state
85     check-registered 2array run-queue push-front ;
86
87 : sleep-time ( -- ms/f )
88     {
89         { [ run-queue dequeue-empty? not ] [ 0 ] }
90         { [ sleep-queue heap-empty? ] [ f ] }
91         [ sleep-queue heap-peek nip millis [-] ]
92     } cond ;
93
94 DEFER: stop
95
96 <PRIVATE
97
98 : schedule-sleep ( thread dt -- )
99     >r check-registered dup r> sleep-queue heap-push*
100     >>sleep-entry drop ;
101
102 : expire-sleep? ( heap -- ? )
103     dup heap-empty?
104     [ drop f ] [ heap-peek nip millis <= ] if ;
105
106 : expire-sleep ( thread -- )
107     f >>sleep-entry resume ;
108
109 : expire-sleep-loop ( -- )
110     sleep-queue
111     [ dup expire-sleep? ]
112     [ dup heap-pop drop expire-sleep ]
113     [ ] while
114     drop ;
115
116 : start ( namestack thread -- )
117     [
118         set-self
119         set-namestack
120         V{ } set-catchstack
121         { } set-retainstack
122         { } set-datastack
123         self quot>> [ call stop ] call-clear
124     ] 2 (throw) ;
125
126 DEFER: next
127
128 : no-runnable-threads ( -- * )
129     ! We should never be in a state where the only threads
130     ! are sleeping; the I/O wait thread is always runnable.
131     ! However, if it dies, we handle this case
132     ! semi-gracefully.
133     !
134     ! And if sleep-time outputs f, there are no sleeping
135     ! threads either... so WTF.
136     sleep-time [ die 0 ] unless* (sleep) next ;
137
138 : (next) ( arg thread -- * )
139     f >>state
140     dup set-self
141     dup runnable>> [
142         continuation>> box> continue-with
143     ] [
144         t >>runnable start
145     ] if ;
146
147 : next ( -- * )
148     expire-sleep-loop
149     run-queue dup dequeue-empty? [
150         drop no-runnable-threads
151     ] [
152         pop-back dup array? [ first2 ] [ f swap ] if (next)
153     ] if ;
154
155 PRIVATE>
156
157 : stop ( -- )
158     self [ exit-handler>> call ] [ unregister-thread ] bi next ;
159
160 : suspend ( quot state -- obj )
161     [
162         >r
163         >r self swap call
164         r> self (>>state)
165         r> self continuation>> >box
166         next
167     ] callcc1 2nip ; inline
168
169 : yield ( -- ) [ resume ] f suspend drop ;
170
171 GENERIC: sleep-until ( time/f -- )
172
173 M: integer sleep-until
174     [ schedule-sleep ] curry "sleep" suspend drop ;
175
176 M: f sleep-until
177     drop [ drop ] "interrupt" suspend drop ;
178
179 GENERIC: sleep ( dt -- )
180
181 M: real sleep
182     millis + >integer sleep-until ;
183
184 : interrupt ( thread -- )
185     dup state>> [
186         dup sleep-entry>> [ sleep-queue heap-delete ] when*
187         f >>sleep-entry
188         dup resume
189     ] when drop ;
190
191 : (spawn) ( thread -- )
192     [ register-thread ] [ namestack swap resume-with ] bi ;
193
194 : spawn ( quot name -- thread )
195     <thread> [ (spawn) ] keep ;
196
197 : spawn-server ( quot name -- thread )
198     >r [ loop ] curry r> spawn ;
199
200 : in-thread ( quot -- )
201     >r datastack r>
202     [ >r set-datastack r> call ] 2curry
203     "Thread" spawn drop ;
204
205 GENERIC: error-in-thread ( error thread -- )
206
207 <PRIVATE
208
209 : init-threads ( -- )
210     H{ } clone 64 setenv
211     <dlist> 65 setenv
212     <min-heap> 66 setenv
213     initial-thread global
214     [ drop f "Initial" <thread> ] cache
215     <box> >>continuation
216     t >>runnable
217     f >>state
218     dup register-thread
219     set-self ;
220
221 [ self error-in-thread stop ]
222 thread-error-hook set-global
223
224 PRIVATE>
225
226 [ init-threads ] "threads" add-init-hook