Debugging pipelines

db4
Slava Pestov 2008-05-05 03:15:24 -05:00
parent 8397b45110
commit 7c7a1f4974
8 changed files with 78 additions and 28 deletions

View File

@ -146,6 +146,16 @@ M: process set-timeout set-process-timeout ;
M: process timed-out kill-process ; M: process timed-out kill-process ;
M: object pipeline-element-quot
[
>process
swap >>stdout
swap >>stdin
run-detached
] curry ;
M: process wait-for-pipeline-element wait-for-process ;
: <process-reader*> ( process encoding -- process stream ) : <process-reader*> ( process encoding -- process stream )
[ [
>r (pipe) { >r (pipe) {

View File

@ -1,19 +1,26 @@
USING: io io.pipes io.streams.string io.encodings.utf8 USING: io io.pipes io.streams.string io.encodings.utf8
continuations tools.test kernel ; io.streams.duplex io.encodings namespaces continuations
tools.test kernel ;
IN: io.pipes.tests IN: io.pipes.tests
[ "Hello" ] [ [ "Hello" ] [
utf8 <pipe> "Hello" over stream-write dispose utf8 <pipe> [
dup stream-readln swap dispose "Hello" print flush
readln
] with-stream
] unit-test ] unit-test
[ { } ] [ { } utf8 with-pipes ] unit-test [ { } ] [ { } with-pipeline ] unit-test
[ { f } ] [ { [ f ] } utf8 with-pipes ] unit-test [ { f } ] [ { [ f ] } with-pipeline ] unit-test
[ { "Hello" } ] [ "Hello" [ { [ readln ] } utf8 with-pipes ] with-string-reader ] unit-test [ { "Hello" } ] [
"Hello" [
{ [ input-stream [ utf8 <decoder> ] change readln ] } with-pipeline
] with-string-reader
] unit-test
[ { f "Hello" } ] [ [ { f "Hello" } ] [
{ {
[ "Hello" print flush f ] [ output-stream [ utf8 <encoder> ] change "Hello" print flush f ]
[ readln ] [ input-stream [ utf8 <decoder> ] change readln ]
} utf8 with-pipes } with-pipeline
] unit-test ] unit-test

View File

@ -2,8 +2,8 @@
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: io.encodings io.backend io.nonblocking io.streams.duplex USING: io.encodings io.backend io.nonblocking io.streams.duplex
io splitting sequences sequences.lib namespaces kernel io splitting sequences sequences.lib namespaces kernel
destructors math concurrency.combinators locals accessors destructors math concurrency.combinators accessors
arrays continuations ; arrays continuations quotations ;
IN: io.pipes IN: io.pipes
TUPLE: pipe in out ; TUPLE: pipe in out ;
@ -13,23 +13,24 @@ M: pipe dispose ( pipe -- )
HOOK: (pipe) io-backend ( -- pipe ) HOOK: (pipe) io-backend ( -- pipe )
:: <pipe> ( encoding -- input-stream output-stream ) : <pipe> ( encoding -- stream )
[ [
(pipe) >r (pipe)
[ add-error-destructor ] [ add-error-destructor ]
[ in>> <reader> encoding <decoder> ] [ in>> <reader> ]
[ out>> <writer> encoding <encoder> ] [ out>> <writer> ]
tri tri
r> <encoder-duplex>
] with-destructors ; ] with-destructors ;
:: with-fds ( input-fd output-fd quot encoding -- ) : with-fds ( input-fd output-fd quot -- )
input-fd [ <reader> encoding <decoder> dup add-always-destructor ] [ input-stream get ] if* [ >r >r [ <reader> dup add-always-destructor ] [ input-stream get ] if* r> r> [
output-fd [ <writer> encoding <encoder> dup add-always-destructor ] [ output-stream get ] if* >r [ <writer> dup add-always-destructor ] [ output-stream get ] if* r>
quot with-output-stream* with-output-stream*
] with-input-stream* ; inline ] 2curry with-input-stream* ; inline
: <pipes> ( n -- pipes ) : <pipes> ( n -- pipes )
[ (pipe) dup add-error-destructor ] replicate [ (pipe) dup add-always-destructor ] replicate
f f pipe boa [ prefix ] [ suffix ] bi f f pipe boa [ prefix ] [ suffix ] bi
2 <sliding-groups> ; 2 <sliding-groups> ;
@ -40,5 +41,16 @@ HOOK: (pipe) io-backend ( -- pipe )
[ call ] parallel-map [ call ] parallel-map
] with-destructors ; ] with-destructors ;
: with-pipes ( seq encoding -- results ) GENERIC: pipeline-element-quot ( obj -- quot )
[ [ with-fds ] 2curry ] curry map with-pipe-fds ;
M: callable pipeline-element-quot
[ with-fds ] curry ;
GENERIC: wait-for-pipeline-element ( obj -- result )
M: object wait-for-pipeline-element ;
: with-pipeline ( seq -- results )
[ pipeline-element-quot ] map
with-pipe-fds
[ wait-for-pipeline-element ] map ;

View File

@ -78,7 +78,8 @@ M: integer init-handle ( fd -- )
#! since on OS X 10.3, this operation fails from init-io #! since on OS X 10.3, this operation fails from init-io
#! when running the Factor.app (presumably because fd 0 and #! when running the Factor.app (presumably because fd 0 and
#! 1 are closed). #! 1 are closed).
F_SETFL O_NONBLOCK fcntl drop ; [ F_SETFL O_NONBLOCK fcntl drop ]
[ F_SETFD FD_CLOEXEC fcntl drop ] bi ;
M: integer close-handle ( fd -- ) M: integer close-handle ( fd -- )
close ; close ;

View File

@ -36,7 +36,8 @@ USE: unix
: reset-fd ( fd -- ) : reset-fd ( fd -- )
#! We drop the error code because on *BSD, fcntl of #! We drop the error code because on *BSD, fcntl of
#! /dev/null fails. #! /dev/null fails.
F_SETFL 0 fcntl drop ; [ F_SETFL 0 fcntl drop ]
[ F_SETFD 0 fcntl drop ] bi ;
: redirect-inherit ( obj mode fd -- ) : redirect-inherit ( obj mode fd -- )
2nip reset-fd ; 2nip reset-fd ;

View File

@ -0,0 +1,16 @@
USING: tools.test io.pipes io.unix.pipes io.encodings.utf8 io
namespaces sequences ;
IN: io.unix.pipes.tests
[ { 0 0 } ] [ { "ls" "grep x" } with-pipeline ] unit-test
! [ ] [
! {
! "ls"
! [
! input-stream [ utf8 <decoder> ] change
! input-stream get lines reverse [ print ] each f
! ]
! "grep x"
! } with-pipeline
! ] unit-test

View File

@ -1,11 +1,12 @@
! Copyright (C) 2008 Slava Pestov. ! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: system alien.c-types kernel unix math sequences USING: system alien.c-types kernel unix math sequences
qualified io.unix.backend ; qualified io.unix.backend io.nonblocking ;
IN: io.unix.pipes IN: io.unix.pipes
QUALIFIED: io.pipes QUALIFIED: io.pipes
M: unix io.pipes:(pipe) ( -- pair ) M: unix io.pipes:(pipe) ( -- pair )
2 "int" <c-array> 2 "int" <c-array>
dup pipe io-error dup pipe io-error
2 c-int-array> first2 io.pipes:pipe boa ; 2 c-int-array> first2
[ [ init-handle ] bi@ ] [ io.pipes:pipe boa ] 2bi ;

View File

@ -21,7 +21,9 @@ IN: unix
: SO_SNDTIMEO HEX: 1005 ; inline : SO_SNDTIMEO HEX: 1005 ; inline
: SO_RCVTIMEO HEX: 1006 ; inline : SO_RCVTIMEO HEX: 1006 ; inline
: F_SETFD 2 ; inline
: F_SETFL 4 ; inline : F_SETFL 4 ; inline
: FD_CLOEXEC 1 ; inline
: O_NONBLOCK 4 ; inline : O_NONBLOCK 4 ; inline
C-STRUCT: sockaddr-in C-STRUCT: sockaddr-in