! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
-USING: io io.backend io.timeouts io.pipes system kernel
-namespaces strings hashtables sequences assocs combinators
-vocabs.loader init threads continuations math io.encodings
-io.streams.duplex io.nonblocking io.streams.duplex accessors
-concurrency.flags destructors ;
+USING: system kernel namespaces strings hashtables sequences
+assocs combinators vocabs.loader init threads continuations
+math accessors concurrency.flags destructors
+io io.backend io.timeouts io.pipes io.pipes.private io.encodings
+io.streams.duplex io.nonblocking ;
IN: io.launcher
TUPLE: process < identity-tuple
M: process timed-out kill-process ;
-M: object pipeline-element-quot
- [
- >process
- swap >>stdout
- swap >>stdin
- run-detached
- ] curry ;
-
-M: process wait-for-pipeline-element wait-for-process ;
+M: object run-pipeline-element
+ [ >process swap >>stdout swap >>stdin run-detached ]
+ [ drop [ [ close-handle ] when* ] bi@ ]
+ 3bi
+ wait-for-process ;
: <process-reader*> ( process encoding -- process stream )
[
r> <encoder-duplex>
] with-destructors ;
-: with-fds ( input-fd output-fd quot -- )
- >r >r [ <reader> dup add-always-destructor ] [ input-stream get ] if* r> r> [
- >r [ <writer> dup add-always-destructor ] [ output-stream get ] if* r>
- with-output-stream*
- ] 2curry with-input-stream* ; inline
+<PRIVATE
-: <pipes> ( n -- pipes )
- [ (pipe) dup add-always-destructor ] replicate
- f f pipe boa [ prefix ] [ suffix ] bi
- 2 <clumps> ;
+: ?reader [ <reader> dup add-always-destructor ] [ input-stream get ] if* ;
+: ?writer [ <writer> dup add-always-destructor ] [ output-stream get ] if* ;
+
+GENERIC: run-pipeline-element ( input-fd output-fd obj -- quot )
-: with-pipe-fds ( seq -- results )
+M: callable run-pipeline-element
[
- [ length dup zero? [ drop { } ] [ 1- <pipes> ] if ] keep
- [ >r [ first in>> ] [ second out>> ] bi r> 2curry ] 2map
- [ call ] parallel-map
+ >r [ ?reader ] [ ?writer ] bi*
+ r> with-streams*
] with-destructors ;
-GENERIC: pipeline-element-quot ( obj -- quot )
-
-M: callable pipeline-element-quot
- [ with-fds ] curry ;
-
-GENERIC: wait-for-pipeline-element ( obj -- result )
+: <pipes> ( n -- pipes )
+ [
+ [ (pipe) dup add-error-destructor ] replicate
+ T{ pipe } [ prefix ] [ suffix ] bi
+ 2 <clumps>
+ ] with-destructors ;
-M: object wait-for-pipeline-element ;
+PRIVATE>
: run-pipeline ( seq -- results )
- [ pipeline-element-quot ] map
- with-pipe-fds
- [ wait-for-pipeline-element ] map ;
+ [ length dup zero? [ drop { } ] [ 1- <pipes> ] if ] keep
+ [
+ >r [ first in>> ] [ second out>> ] bi
+ r> run-pipeline-element
+ ] 2parallel-map ;