]> gitweb.factorcode.org Git - factor.git/blob - core/threads/threads.factor
Cleanup io.pipes and fix io.unix.pipes hang
[factor.git] / core / threads / threads.factor
1 ! Copyright (C) 2004, 2008 Slava Pestov.
2 ! Copyright (C) 2005 Mackenzie Straight.
3 ! See http://factorcode.org/license.txt for BSD license.
4 USING: arrays hashtables heaps kernel kernel.private math
5 namespaces sequences vectors continuations continuations.private
6 dlists assocs system combinators init boxes accessors
7 math.order ;
8 IN: threads
9
10 SYMBOL: initial-thread
11
12 TUPLE: thread
13 name quot exit-handler
14 id
15 continuation state
16 mailbox variables sleep-entry ;
17
18 : self ( -- thread ) 40 getenv ; inline
19
20 ! Thread-local storage
21 : tnamespace ( -- assoc )
22     self variables>> [ H{ } clone dup self (>>variables) ] unless* ;
23
24 : tget ( key -- value )
25     self variables>> at ;
26
27 : tset ( value key -- )
28     tnamespace set-at ;
29
30 : tchange ( key quot -- )
31     tnamespace swap change-at ; inline
32
33 : threads 41 getenv ;
34
35 : thread ( id -- thread ) threads at ;
36
37 : thread-registered? ( thread -- ? )
38     id>> threads key? ;
39
40 : check-unregistered
41     dup thread-registered?
42     [ "Thread already stopped" throw ] when ;
43
44 : check-registered
45     dup thread-registered?
46     [ "Thread is not running" throw ] unless ;
47
48 <PRIVATE
49
50 : register-thread ( thread -- )
51     check-unregistered dup id>> threads set-at ;
52
53 : unregister-thread ( thread -- )
54     check-registered id>> threads delete-at ;
55
56 : set-self ( thread -- ) 40 setenv ; inline
57
58 PRIVATE>
59
60 : new-thread ( quot name class -- thread )
61     new
62         swap >>name
63         swap >>quot
64         \ thread counter >>id
65         <box> >>continuation
66         [ ] >>exit-handler ; inline
67
68 : <thread> ( quot name -- thread )
69     \ thread new-thread ;
70
71 : run-queue 42 getenv ;
72
73 : sleep-queue 43 getenv ;
74
75 : resume ( thread -- )
76     f >>state
77     check-registered run-queue push-front ;
78
79 : resume-now ( thread -- )
80     f >>state
81     check-registered run-queue push-back ;
82
83 : resume-with ( obj thread -- )
84     f >>state
85     check-registered 2array run-queue push-front ;
86
87 : sleep-time ( -- ms/f )
88     {
89         { [ run-queue dlist-empty? not ] [ 0 ] }
90         { [ sleep-queue heap-empty? ] [ f ] }
91         [ sleep-queue heap-peek nip millis [-] ]
92     } cond ;
93
94 <PRIVATE
95
96 : schedule-sleep ( thread dt -- )
97     >r check-registered dup r> sleep-queue heap-push*
98     >>sleep-entry drop ;
99
100 : expire-sleep? ( heap -- ? )
101     dup heap-empty?
102     [ drop f ] [ heap-peek nip millis <= ] if ;
103
104 : expire-sleep ( thread -- )
105     f >>sleep-entry resume ;
106
107 : expire-sleep-loop ( -- )
108     sleep-queue
109     [ dup expire-sleep? ]
110     [ dup heap-pop drop expire-sleep ]
111     [ ] while
112     drop ;
113
114 : next ( -- * )
115     expire-sleep-loop
116     run-queue dup dlist-empty? [
117         ! We should never be in a state where the only threads
118         ! are sleeping; the I/O wait thread is always runnable.
119         ! However, if it dies, we handle this case
120         ! semi-gracefully.
121         !
122         ! And if sleep-time outputs f, there are no sleeping
123         ! threads either... so WTF.
124         drop sleep-time [ die 0 ] unless* (sleep) next
125     ] [
126         pop-back
127         dup array? [ first2 ] [ f swap ] if dup set-self
128         f >>state
129         continuation>> box>
130         continue-with
131     ] if ;
132
133 PRIVATE>
134
135 : stop ( -- )
136     self dup exit-handler>> call
137     unregister-thread next ;
138
139 : suspend ( quot state -- obj )
140     [
141         self continuation>> >box
142         self (>>state)
143         self swap call next
144     ] callcc1 2nip ; inline
145
146 : yield ( -- ) [ resume ] f suspend drop ;
147
148 GENERIC: sleep-until ( time/f -- )
149
150 M: integer sleep-until
151     [ schedule-sleep ] curry "sleep" suspend drop ;
152
153 M: f sleep-until
154     drop [ drop ] "interrupt" suspend drop ;
155
156 GENERIC: sleep ( dt -- )
157
158 M: real sleep
159     millis + >integer sleep-until ;
160
161 : interrupt ( thread -- )
162     dup state>> [
163         dup sleep-entry>> [ sleep-queue heap-delete ] when*
164         f >>sleep-entry
165         dup resume
166     ] when drop ;
167
168 : (spawn) ( thread -- )
169     [
170         resume-now [
171             dup set-self
172             dup register-thread
173             V{ } set-catchstack
174             { } set-retainstack
175             >r { } set-datastack r>
176             quot>> [ call stop ] call-clear
177         ] 1 (throw)
178     ] "spawn" suspend 2drop ;
179
180 : spawn ( quot name -- thread )
181     <thread> [ (spawn) ] keep ;
182
183 : spawn-server ( quot name -- thread )
184     >r [ [ ] [ ] while ] curry r> spawn ;
185
186 : in-thread ( quot -- )
187     >r datastack namestack r>
188     [ >r set-namestack set-datastack r> call ] 3curry
189     "Thread" spawn drop ;
190
191 GENERIC: error-in-thread ( error thread -- )
192
193 <PRIVATE
194
195 : init-threads ( -- )
196     H{ } clone 41 setenv
197     <dlist> 42 setenv
198     <min-heap> 43 setenv
199     initial-thread global
200     [ drop f "Initial" <thread> ] cache
201     <box> >>continuation
202     f >>state
203     dup register-thread
204     set-self ;
205
206 [ self error-in-thread stop ]
207 thread-error-hook set-global
208
209 PRIVATE>
210
211 [ init-threads ] "threads" add-init-hook