USING: io io.pipes io.streams.string io.encodings.utf8 io.encodings.binary io.streams.duplex io.encodings io.timeouts namespaces continuations tools.test kernel calendar destructors accessors debugger math sequences threads concurrency.count-downs fry ; IN: io.pipes.tests { "Hello" } [ utf8 [ "Hello" print flush readln ] with-stream ] unit-test ! Test run-pipeline { { } } [ { } run-pipeline ] unit-test { { f } } [ { [ f ] } run-pipeline ] unit-test { { "Hello" } } [ "Hello" [ { [ input-stream [ utf8 ] change readln ] } run-pipeline ] with-string-reader ] unit-test { { f "Hello" } } [ { [ output-stream [ utf8 ] change "Hello" print flush f ] [ input-stream [ utf8 ] change readln ] } run-pipeline ] unit-test ! Test timeout [ utf8 [ 1 seconds over set-timeout stream-readln ] with-disposal ] must-fail ! Test writing to a half-open pipe { } [ 1000 [ utf8 [ [ in>> dispose ] [ out>> "hi" over stream-write dispose ] bi ] curry ignore-errors ] times ] unit-test ! Test non-blocking operation { } [ [ 2 "count-down" set utf8 &dispose utf8 &dispose [ [ '[ _ stream-read1 drop "count-down" get count-down ] in-thread ] bi@ ! Give the threads enough time to start blocking on ! read 1 seconds sleep ] ! At this point, two threads are blocking on read [ [ "Hi" over stream-write stream-flush ] bi@ ] ! At this point, both threads should wake up 2bi "count-down" get await ] with-destructors ] unit-test ! 0 read should not block { f } [ [ binary &dispose in>> [ 0 read ] with-input-stream ] with-destructors ] unit-test