Cleanup io.pipes and fix io.unix.pipes hang

db4
Slava Pestov 2008-05-06 21:23:18 -05:00
parent 5666cd78b9
commit 90299783d6
4 changed files with 32 additions and 38 deletions

View File

@ -1,10 +1,10 @@
! Copyright (C) 2008 Slava Pestov. ! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: io io.backend io.timeouts io.pipes system kernel USING: system kernel namespaces strings hashtables sequences
namespaces strings hashtables sequences assocs combinators assocs combinators vocabs.loader init threads continuations
vocabs.loader init threads continuations math io.encodings math accessors concurrency.flags destructors
io.streams.duplex io.nonblocking io.streams.duplex accessors io io.backend io.timeouts io.pipes io.pipes.private io.encodings
concurrency.flags destructors ; io.streams.duplex io.nonblocking ;
IN: io.launcher IN: io.launcher
TUPLE: process < identity-tuple TUPLE: process < identity-tuple
@ -149,15 +149,11 @@ M: process set-timeout set-process-timeout ;
M: process timed-out kill-process ; M: process timed-out kill-process ;
M: object pipeline-element-quot M: object run-pipeline-element
[ [ >process swap >>stdout swap >>stdin run-detached ]
>process [ drop [ [ close-handle ] when* ] bi@ ]
swap >>stdout 3bi
swap >>stdin wait-for-process ;
run-detached
] curry ;
M: process wait-for-pipeline-element wait-for-process ;
: <process-reader*> ( process encoding -- process stream ) : <process-reader*> ( process encoding -- process stream )
[ [

View File

@ -23,34 +23,31 @@ HOOK: (pipe) io-backend ( -- pipe )
r> <encoder-duplex> r> <encoder-duplex>
] with-destructors ; ] with-destructors ;
: with-fds ( input-fd output-fd quot -- ) <PRIVATE
>r >r [ <reader> dup add-always-destructor ] [ input-stream get ] if* r> r> [
>r [ <writer> dup add-always-destructor ] [ output-stream get ] if* r>
with-output-stream*
] 2curry with-input-stream* ; inline
: <pipes> ( n -- pipes ) : ?reader [ <reader> dup add-always-destructor ] [ input-stream get ] if* ;
[ (pipe) dup add-always-destructor ] replicate : ?writer [ <writer> dup add-always-destructor ] [ output-stream get ] if* ;
f f pipe boa [ prefix ] [ suffix ] bi
2 <clumps> ;
: with-pipe-fds ( seq -- results ) GENERIC: run-pipeline-element ( input-fd output-fd obj -- quot )
M: callable run-pipeline-element
[ [
[ length dup zero? [ drop { } ] [ 1- <pipes> ] if ] keep >r [ ?reader ] [ ?writer ] bi*
[ >r [ first in>> ] [ second out>> ] bi r> 2curry ] 2map r> with-streams*
[ call ] parallel-map
] with-destructors ; ] with-destructors ;
GENERIC: pipeline-element-quot ( obj -- quot ) : <pipes> ( n -- pipes )
[
[ (pipe) dup add-error-destructor ] replicate
T{ pipe } [ prefix ] [ suffix ] bi
2 <clumps>
] with-destructors ;
M: callable pipeline-element-quot PRIVATE>
[ with-fds ] curry ;
GENERIC: wait-for-pipeline-element ( obj -- result )
M: object wait-for-pipeline-element ;
: run-pipeline ( seq -- results ) : run-pipeline ( seq -- results )
[ pipeline-element-quot ] map [ length dup zero? [ drop { } ] [ 1- <pipes> ] if ] keep
with-pipe-fds [
[ wait-for-pipeline-element ] map ; >r [ first in>> ] [ second out>> ] bi
r> run-pipeline-element
] 2parallel-map ;

View File

@ -99,7 +99,7 @@ accessors kernel sequences io.encodings.utf8 ;
utf8 file-contents utf8 file-contents
] unit-test ] unit-test
[ ] [ "append-test" temp-file delete-file ] unit-test [ "append-test" temp-file delete-file ] ignore-errors
[ "hi\nhi\n" ] [ [ "hi\nhi\n" ] [
2 [ 2 [

View File

@ -9,6 +9,7 @@ IN: io.unix.pipes.tests
"ls" "ls"
[ [
input-stream [ utf8 <decoder> ] change input-stream [ utf8 <decoder> ] change
output-stream [ utf8 <encoder> ] change
input-stream get lines reverse [ print ] each f input-stream get lines reverse [ print ] each f
] ]
"grep x" "grep x"