GENERIC: stream-read ( count stream -- string )
GENERIC: stream-write-attr ( string style stream -- )
GENERIC: stream-close ( stream -- )
+GENERIC: set-timeout ( timeout stream -- )
: stream-read1 ( stream -- char/f )
1 swap stream-read dup empty? [ drop f ] [ 0 swap nth ] ifte ;
M: duplex-stream stream-close
duplex-stream-out stream-close ;
+M: duplex-stream set-timeout
+ 2dup
+ duplex-stream-in set-timeout
+ duplex-stream-out set-timeout ;
+
! Reading lines and counting line numbers.
SYMBOL: line-number
SYMBOL: parser-stream
F_SETFL O_NONBLOCK fcntl io-error ;
! Common delegate of native stream readers and writers
-TUPLE: port handle buffer error ;
+TUPLE: port handle buffer error timeout cutoff ;
+
+: make-buffer ( n -- buffer/f )
+ dup 0 > [ <buffer> ] [ drop f ] ifte ;
C: port ( handle buffer -- port )
- [
- >r dup 0 > [ <buffer> ] [ drop f ] ifte r> set-delegate
- ] keep
+ [ 0 swap set-port-timeout ] keep
+ [ 0 swap set-port-cutoff ] keep
+ [ >r make-buffer r> set-delegate ] keep
[ >r dup init-handle r> set-port-handle ] keep ;
M: port stream-close ( port -- )
dup port-handle close
delegate [ buffer-free ] when* ;
+: touch-port ( port -- )
+ dup port-timeout dup 0 = [
+ 2drop
+ ] [
+ millis + swap set-port-cutoff
+ ] ifte ;
+
+M: port set-timeout ( timeout port -- )
+ [ set-port-timeout ] keep touch-port ;
+
: buffered-port 8192 <port> ;
: >port< dup port-handle swap delegate ;
! this with the hash-size call.
SYMBOL: io-tasks
+: io-task ( pollfd -- io-task ) pollfd-fd io-tasks get hash ;
+
: io-task-fd io-task-port port-handle ;
: add-io-task ( callback task -- )
drop swap remove-io-task
] ifte ;
-: handle-fd ( fd -- quot )
- io-tasks get hash dup do-io-task [
- pop-callback
+: handle-fd ( pollfd -- quot )
+ io-task dup do-io-task [
+ dup io-task-port touch-port pop-callback
] [
drop f
] ifte ;
+: timeout? ( port -- ? )
+ port-cutoff dup 0 = not swap millis < and ;
+
+: handle-fd? ( pollfd -- ? )
+ dup pollfd-revents 0 = not >r
+ io-task io-task-port timeout? r> or ;
+
: do-io-tasks ( pollfds n -- )
[
- dup pick pollfd-nth dup pollfd-revents 0 = [
- drop
+ dup pick pollfd-nth dup handle-fd? [
+ handle-fd [ call ] when*
] [
- pollfd-fd handle-fd [ call ] when*
+ drop
] ifte
] repeat drop ;
+: io-task# io-tasks get hash-size ;
+
+: io-task-list io-tasks get hash-values ;
+
: init-pollfd ( task pollfd -- )
over io-task-fd over set-pollfd-fd
swap io-task-events swap set-pollfd-events ;
: make-pollfds ( -- pollfds n )
- io-tasks get dup hash-size [
- swap >r <pollfd-array> 0 swap r> hash-values [
- ( n pollfds iotask )
- pick pick pollfd-nth init-pollfd >r 1 + r>
- ] each nip
+ io-task# [
+ <pollfd-array> 0 io-task-list [
+ pick pick swap pollfd-nth init-pollfd 1 +
+ ] each drop
] keep ;
: io-multiplex ( timeout -- )