]> gitweb.factorcode.org Git - factor.git/blob - core/threads/threads.factor
Thread refactoring work in progress
[factor.git] / core / threads / threads.factor
1 ! Copyright (C) 2004, 2008 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 IN: threads
5 USING: arrays hashtables heaps kernel kernel.private math
6 namespaces sequences vectors continuations continuations.private
7 dlists assocs system combinators init boxes accessors ;
8
9 SYMBOL: initial-thread
10
11 TUPLE: thread
12 name quot exit-handler
13 id
14 continuation state
15 mailbox variables sleep-entry ;
16
17 : self ( -- thread ) 40 getenv ; inline
18
19 ! Thread-local storage
20 : tnamespace ( -- assoc )
21     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
22
23 : tget ( key -- value )
24     self variables>> at ;
25
26 : tset ( value key -- )
27     tnamespace set-at ;
28
29 : tchange ( key quot -- )
30     tnamespace swap change-at ; inline
31
32 : threads 41 getenv ;
33
34 : thread ( id -- thread ) threads at ;
35
36 : thread-registered? ( thread -- ? )
37     id>> threads key? ;
38
39 : check-unregistered
40     dup thread-registered?
41     [ "Thread already stopped" throw ] when ;
42
43 : check-registered
44     dup thread-registered?
45     [ "Thread is not running" throw ] unless ;
46
47 <PRIVATE
48
49 : register-thread ( thread -- )
50     check-unregistered dup id>> threads set-at ;
51
52 : unregister-thread ( thread -- )
53     check-registered id>> threads delete-at ;
54
55 : set-self ( thread -- ) 40 setenv ; inline
56
57 PRIVATE>
58
59 : new-thread ( quot name class -- thread )
60     new
61         swap >>name
62         swap >>quot
63         \ thread counter >>id
64         <box> >>continuation
65         [ ] >>exit-handler ; inline
66
67 : <thread> ( quot name -- thread )
68     \ thread new-thread ;
69
70 : run-queue 42 getenv ;
71
72 : sleep-queue 43 getenv ;
73
74 : resume ( thread -- )
75     f >>state
76     check-registered run-queue push-front ;
77
78 : resume-now ( thread -- )
79     f >>state
80     check-registered run-queue push-back ;
81
82 : resume-with ( obj thread -- )
83     f >>state
84     check-registered 2array run-queue push-front ;
85
86 : sleep-time ( -- ms/f )
87     {
88         { [ run-queue dlist-empty? not ] [ 0 ] }
89         { [ sleep-queue heap-empty? ] [ f ] }
90         [ sleep-queue heap-peek nip millis [-] ]
91     } cond ;
92
93 DEFER: stop
94
95 <PRIVATE
96
97 : schedule-sleep ( thread ms -- )
98     >r check-registered dup r> sleep-queue heap-push*
99     >>sleep-entry drop ;
100
101 : expire-sleep? ( heap -- ? )
102     dup heap-empty?
103     [ drop f ] [ heap-peek nip millis <= ] if ;
104
105 : expire-sleep ( thread -- )
106     f >>sleep-entry resume ;
107
108 : expire-sleep-loop ( -- )
109     sleep-queue
110     [ dup expire-sleep? ]
111     [ dup heap-pop drop expire-sleep ]
112     [ ] while
113     drop ;
114
115 : start ( namestack thread -- )
116     [
117         set-self
118         set-namestack
119         V{ } set-catchstack
120         { } set-retainstack
121         { } set-datastack
122         self quot>> [ call stop ] call-clear
123     ] 2 (throw) ;
124
125 DEFER: next
126
127 : no-runnable-threads ( -- * )
128     ! We should never be in a state where the only threads
129     ! are sleeping; the I/O wait thread is always runnable.
130     ! However, if it dies, we handle this case
131     ! semi-gracefully.
132     !
133     ! And if sleep-time outputs f, there are no sleeping
134     ! threads either... so WTF.
135     sleep-time [ die 0 ] unless* (sleep) next ;
136
137 : (next) ( arg thread -- * )
138     f >>state
139     dup set-self
140     dup continuation>> ?box
141     [ nip continue-with ] [ drop start ] if ;
142
143 : next ( -- * )
144     expire-sleep-loop
145     run-queue dup dlist-empty? [
146         drop no-runnable-threads
147     ] [
148         pop-back dup array? [ first2 ] [ f swap ] if (next)
149     ] if ;
150
151 PRIVATE>
152
153 : stop ( -- )
154     self [ exit-handler>> call ] [ unregister-thread ] bi next ;
155
156 : suspend ( quot state -- obj )
157     [
158         >r
159         >r self swap call
160         r> self (>>state)
161         r> self continuation>> >box
162         next
163     ] callcc1 2nip ; inline
164
165 : yield ( -- ) [ resume ] f suspend drop ;
166
167 GENERIC: sleep-until ( time/f -- )
168
169 M: integer sleep-until
170     [ schedule-sleep ] curry "sleep" suspend drop ;
171
172 M: f sleep-until
173     drop [ drop ] "interrupt" suspend drop ;
174
175 GENERIC: sleep ( ms -- )
176
177 M: real sleep
178     millis + >integer sleep-until ;
179
180 : interrupt ( thread -- )
181     dup state>> [
182         dup sleep-entry>> [ sleep-queue heap-delete ] when*
183         f >>sleep-entry
184         dup resume
185     ] when drop ;
186
187 : (spawn) ( thread -- )
188     [ register-thread ] [ namestack swap resume-with ] bi ;
189
190 : spawn ( quot name -- thread )
191     <thread> [ (spawn) ] keep ;
192
193 : spawn-server ( quot name -- thread )
194     >r [ [ ] [ ] while ] curry r> spawn ;
195
196 : in-thread ( quot -- )
197     >r datastack r>
198     [ >r set-datastack r> call ] 2curry
199     "Thread" spawn drop ;
200
201 GENERIC: error-in-thread ( error thread -- )
202
203 <PRIVATE
204
205 : init-threads ( -- )
206     H{ } clone 41 setenv
207     <dlist> 42 setenv
208     <min-heap> 43 setenv
209     initial-thread global
210     [ drop f "Initial" <thread> ] cache
211     <box> >>continuation
212     f >>state
213     dup register-thread
214     set-self ;
215
216 [ self error-in-thread stop ]
217 thread-error-hook set-global
218
219 PRIVATE>
220
221 [ init-threads ] "threads" add-init-hook