factor/extra/io/nonblocking/nonblocking.factor

224 lines
5.5 KiB
Factor
Raw Normal View History

! Copyright (C) 2005, 2008 Slava Pestov, Doug Coleman
2007-09-20 18:09:08 -04:00
! See http://factorcode.org/license.txt for BSD license.
IN: io.nonblocking
USING: math kernel io sequences io.buffers generic sbufs system
io.streams.lines io.streams.plain io.streams.duplex io.backend
continuations debugger classes byte-arrays namespaces splitting
2008-01-31 02:16:10 -05:00
dlists assocs ;
2007-09-20 18:09:08 -04:00
2007-11-07 14:01:45 -05:00
SYMBOL: default-buffer-size
64 1024 * default-buffer-size set-global
2007-09-20 18:09:08 -04:00
! Common delegate of native stream readers and writers
TUPLE: port
handle
error
timeout-entry timeout cutoff
type eof? ;
2007-09-20 18:09:08 -04:00
SYMBOL: closed
2008-01-21 15:33:43 -05:00
PREDICATE: port input-port port-type input-port eq? ;
PREDICATE: port output-port port-type output-port eq? ;
2007-09-20 18:09:08 -04:00
GENERIC: init-handle ( handle -- )
2007-11-07 14:01:45 -05:00
GENERIC: close-handle ( handle -- )
2007-09-20 18:09:08 -04:00
2008-01-21 15:33:43 -05:00
: <port> ( handle buffer type -- port )
pick init-handle
2007-09-20 18:09:08 -04:00
0 0 {
set-port-handle
set-delegate
2008-01-21 15:33:43 -05:00
set-port-type
2007-09-20 18:09:08 -04:00
set-port-timeout
set-port-cutoff
} port construct ;
2008-01-21 15:33:43 -05:00
: <buffered-port> ( handle type -- port )
default-buffer-size get <buffer> swap <port> ;
2007-09-20 18:09:08 -04:00
: <reader> ( handle -- stream )
2008-01-21 15:33:43 -05:00
input-port <buffered-port> <line-reader> ;
2007-09-20 18:09:08 -04:00
: <writer> ( handle -- stream )
2008-01-21 15:33:43 -05:00
output-port <buffered-port> <plain-writer> ;
2007-09-20 18:09:08 -04:00
: handle>duplex-stream ( in-handle out-handle -- stream )
<writer>
[ >r <reader> r> <duplex-stream> ] [ ] [ dispose ]
2007-09-20 18:09:08 -04:00
cleanup ;
: timeout? ( port -- ? )
port-cutoff dup zero? not swap millis < and ;
: pending-error ( port -- )
dup port-error f rot set-port-error [ throw ] when* ;
SYMBOL: timeout-queue
2008-01-31 02:16:10 -05:00
timeout-queue global [ [ <dlist> ] unless* ] change-at
: unqueue-timeout ( port -- )
port-timeout-entry [
timeout-queue get-global swap delete-node
] when* ;
: queue-timeout ( port -- )
dup timeout-queue get-global push-front*
swap set-port-timeout-entry ;
HOOK: expire-port io-backend ( port -- )
M: object expire-port drop ;
: expire-timeouts ( -- )
timeout-queue get-global dup dlist-empty? [ drop ] [
dup peek-back timeout?
[ pop-back expire-port expire-timeouts ] [ drop ] if
] if ;
: touch-port ( port -- )
dup port-timeout dup zero? [
2drop
] [
millis + over set-port-cutoff
dup unqueue-timeout queue-timeout
] if ;
2007-09-20 18:09:08 -04:00
M: port set-timeout
[ set-port-timeout ] keep touch-port ;
GENERIC: (wait-to-read) ( port -- )
: wait-to-read ( count port -- )
tuck buffer-length > [ (wait-to-read) ] [ drop ] if ;
: wait-to-read1 ( port -- )
1 swap wait-to-read ;
: unless-eof ( port quot -- value )
>r dup buffer-empty? over port-eof? and
[ f swap set-port-eof? f ] r> if ; inline
M: input-port stream-read1
dup wait-to-read1 [ buffer-pop ] unless-eof ;
: read-step ( count port -- string/f )
[ wait-to-read ] 2keep
[ dupd buffer> ] unless-eof nip ;
: read-loop ( count port sbuf -- )
pick over length - dup 0 > [
pick read-step dup [
over push-all read-loop
] [
2drop 2drop
] if
] [
2drop 2drop
] if ;
M: input-port stream-read
>r 0 max >fixnum r>
2dup read-step dup [
pick over length > [
pick <sbuf>
[ push-all ] keep
[ read-loop ] keep
"" like
] [
2nip
] if
] [
2nip
] if ;
: read-until-step ( separators port -- string/f separator/f )
dup wait-to-read1
dup port-eof? [
f swap set-port-eof? drop f f
] [
buffer-until
] if ;
: read-until-loop ( seps port sbuf -- separator/f )
2008-01-11 17:02:44 -05:00
2over read-until-step over [
2007-09-20 18:09:08 -04:00
>r over push-all r> dup [
>r 3drop r>
] [
drop read-until-loop
] if
] [
>r 2drop 2drop r>
] if ;
M: input-port stream-read-until ( seps port -- str/f sep/f )
2dup read-until-step dup [
>r 2nip r>
] [
over [
drop >sbuf [ read-until-loop ] keep "" like swap
] [
>r 2nip r>
] if
] if ;
M: input-port stream-read-partial ( max stream -- string/f )
>r 0 max >fixnum r> read-step ;
: can-write? ( len writer -- ? )
2007-11-09 03:01:45 -05:00
[ buffer-fill + ] keep buffer-capacity <= ;
2007-09-20 18:09:08 -04:00
: wait-to-write ( len port -- )
tuck can-write? [ drop ] [ stream-flush ] if ;
M: output-port stream-write1
1 over wait-to-write ch>buffer ;
M: output-port stream-write
2007-11-09 03:01:45 -05:00
over length over buffer-size > [
[ buffer-size <groups> ] keep
[ stream-write ] curry each
] [
over length over wait-to-write >buffer
] if ;
2007-09-20 18:09:08 -04:00
2007-11-07 14:01:45 -05:00
GENERIC: port-flush ( port -- )
M: output-port stream-flush ( port -- )
dup port-flush pending-error ;
M: port dispose
2007-11-07 14:01:45 -05:00
dup port-type closed eq? [
dup port-type >r closed over set-port-type r>
2008-01-21 17:29:54 -05:00
output-port eq? [ dup port-flush ] when
2007-11-07 14:01:45 -05:00
dup port-handle close-handle
dup delegate [ buffer-free ] when*
f over set-delegate
] unless drop ;
2007-09-20 18:09:08 -04:00
TUPLE: server-port addr client ;
2008-01-21 15:33:43 -05:00
: <server-port> ( handle addr -- server )
>r f server-port <port> r>
2007-09-20 18:09:08 -04:00
{ set-delegate set-server-port-addr }
server-port construct ;
: check-server-port ( port -- )
port-type server-port assert= ;
TUPLE: datagram-port addr packet packet-addr ;
2008-01-21 15:33:43 -05:00
: <datagram-port> ( handle addr -- datagram )
>r f datagram-port <port> r>
2007-09-20 18:09:08 -04:00
{ set-delegate set-datagram-port-addr }
datagram-port construct ;
: check-datagram-port ( port -- )
port-type datagram-port assert= ;
: check-datagram-send ( packet addrspec port -- )
dup check-datagram-port
datagram-port-addr [ class ] 2apply assert=
class byte-array assert= ;