I/O cleanups

db4
Slava Pestov 2008-01-21 15:33:43 -05:00
parent feb4e8df9e
commit c1963dd4ab
10 changed files with 62 additions and 72 deletions

View File

@ -55,7 +55,7 @@ HELP: init-handle
{ $contract "Prepares a native handle for use by the port; called by " { $link <port> } "." } ;
HELP: <port>
{ $values { "handle" "a native handle identifying an I/O resource" } { "buffer" "a " { $link buffer } " or " { $link f } } { "port" "a new " { $link port } } }
{ $values { "handle" "a native handle identifying an I/O resource" } { "buffer" "a " { $link buffer } " or " { $link f } } { "type" symbol } { "port" "a new " { $link port } } }
{ $description "Creates a new " { $link port } " using the specified native handle and I/O buffer." }
$low-level-note ;

View File

@ -12,38 +12,36 @@ SYMBOL: default-buffer-size
! Common delegate of native stream readers and writers
TUPLE: port handle error timeout cutoff type eof? ;
SYMBOL: input
SYMBOL: output
SYMBOL: closed
PREDICATE: port input-port port-type input eq? ;
PREDICATE: port output-port port-type output eq? ;
PREDICATE: port input-port port-type input-port eq? ;
PREDICATE: port output-port port-type output-port eq? ;
GENERIC: init-handle ( handle -- )
GENERIC: close-handle ( handle -- )
: <port> ( handle buffer -- port )
over init-handle
: <port> ( handle buffer type -- port )
pick init-handle
0 0 {
set-port-handle
set-delegate
set-port-type
set-port-timeout
set-port-cutoff
} port construct ;
: <buffered-port> ( handle -- port )
default-buffer-size get <buffer> <port> ;
: <buffered-port> ( handle type -- port )
default-buffer-size get <buffer> swap <port> ;
: <reader> ( handle -- stream )
<buffered-port> input over set-port-type <line-reader> ;
input-port <buffered-port> <line-reader> ;
: <writer> ( handle -- stream )
<buffered-port> output over set-port-type <plain-writer> ;
output-port <buffered-port> <plain-writer> ;
: handle>duplex-stream ( in-handle out-handle -- stream )
<writer>
[ >r <reader> r> <duplex-stream> ]
[ ] [ stream-close ]
[ >r <reader> r> <duplex-stream> ] [ ] [ stream-close ]
cleanup ;
: touch-port ( port -- )
@ -170,8 +168,8 @@ M: port stream-close
TUPLE: server-port addr client ;
: <server-port> ( port addr -- server )
server-port pick set-port-type
: <server-port> ( handle addr -- server )
>r f server-port <port> r>
{ set-delegate set-server-port-addr }
server-port construct ;
@ -180,8 +178,8 @@ TUPLE: server-port addr client ;
TUPLE: datagram-port addr packet packet-addr ;
: <datagram-port> ( port addr -- datagram )
datagram-port pick set-port-type
: <datagram-port> ( handle addr -- datagram )
>r f datagram-port <port> r>
{ set-delegate set-datagram-port-addr }
datagram-port construct ;

View File

@ -83,7 +83,7 @@ M: unix-io <sniffer> ( obj -- sniffer )
] keep
dupd sniffer-spec-ifname ioctl-sniffer-fd
dup make-ioctl-buffer
<port> input over set-port-type <line-reader>
input-port <port> <line-reader>
\ sniffer construct-delegate
] with-destructors ;

View File

@ -18,19 +18,33 @@ TUPLE: io-task port callbacks ;
>r 1vector io-task construct-boa r> construct-delegate ;
inline
TUPLE: input-task ;
: <input-task> ( port continuation class -- task )
>r input-task <io-task> r> construct-delegate ; inline
TUPLE: output-task ;
: <output-task> ( port continuation class -- task )
>r output-task <io-task> r> construct-delegate ; inline
GENERIC: do-io-task ( task -- ? )
GENERIC: io-task-container ( mx task -- hashtable )
! I/O multiplexers
TUPLE: mx fd reads writes ;
M: input-task io-task-container drop mx-reads ;
M: output-task io-task-container drop mx-writes ;
: <mx> ( -- mx ) f H{ } clone H{ } clone mx construct-boa ;
: construct-mx ( class -- obj ) <mx> swap construct-delegate ;
GENERIC: register-io-task ( task mx -- )
GENERIC: unregister-io-task ( task mx -- )
GENERIC: unix-io-multiplex ( ms mx -- )
GENERIC: wait-for-events ( ms mx -- )
: fd/container ( task mx -- task fd container )
over io-task-container >r dup io-task-fd r> ; inline
@ -112,14 +126,12 @@ M: integer close-handle ( fd -- )
TUPLE: read-task ;
: <read-task> ( port continuation -- task )
read-task <io-task> ;
read-task <input-task> ;
M: read-task do-io-task
io-task-port dup refill
[ [ reader-eof ] [ drop ] if ] keep ;
M: read-task io-task-container drop mx-reads ;
M: input-port (wait-to-read)
[ <read-task> add-io-task stop ] callcc0 pending-error ;
@ -131,14 +143,12 @@ M: input-port (wait-to-read)
TUPLE: write-task ;
: <write-task> ( port continuation -- task )
write-task <io-task> ;
write-task <output-task> ;
M: write-task do-io-task
io-task-port dup buffer-empty? over port-error or
[ 0 swap buffer-reset t ] [ write-step ] if ;
M: write-task io-task-container drop mx-writes ;
: add-write-io-task ( port continuation -- )
over port-handle mx get-global mx-writes at*
[ io-task-callbacks push drop ]
@ -151,7 +161,7 @@ M: port port-flush ( port -- )
dup buffer-empty? [ drop ] [ (wait-to-write) ] if ;
M: unix-io io-multiplex ( ms -- )
mx get-global unix-io-multiplex ;
mx get-global wait-for-events ;
M: unix-io init-stdio ( -- )
0 1 handle>duplex-stream io:stdio set-global
@ -161,8 +171,7 @@ M: unix-io init-stdio ( -- )
TUPLE: mx-port mx ;
: <mx-port> ( mx -- port )
dup mx-fd f <port>
mx-port over set-port-type
dup mx-fd f mx-port <port>
{ set-mx-port-mx set-delegate } mx-port construct ;
TUPLE: mx-task ;
@ -171,7 +180,7 @@ TUPLE: mx-task ;
f io-task construct-boa mx-task construct-delegate ;
M: mx-task do-io-task
io-task-port mx-port-mx 0 swap unix-io-multiplex f ;
io-task-port mx-port-mx 0 swap wait-for-events f ;
: multiplexer-error ( n -- )
0 < [ err_no ignorable-error? [ (io-error) ] unless ] when ;

View File

@ -16,19 +16,15 @@ TUPLE: epoll-mx events ;
max-events epoll_create dup io-error over set-mx-fd
max-events "epoll-event" <c-array> over set-epoll-mx-events ;
: io-task-filter ( task -- n )
class {
{ read-task [ EVFILT_READ ] }
{ accept-task [ EVFILT_READ ] }
{ receive-task [ EVFILT_READ ] }
{ write-task [ EVFILT_WRITE ] }
{ connect-task [ EVFILT_WRITE ] }
{ send-task [ EVFILT_WRITE ] }
} case ;
GENERIC: io-task-events ( task -- n )
M: input-task drop EPOLLIN ;
M: output-task drop EPOLLOUT ;
: make-event ( task -- event )
"epoll-event" <c-object>
tuck set-epoll-event-events
over io-task-events over set-epoll-event-events
over io-task-fd over set-epoll-fd ;
: do-epoll-ctl ( task mx what -- )
@ -57,5 +53,5 @@ M: epoll-mx unregister-io-task ( task mx -- )
: handle-events ( mx n -- )
[ over epoll-mx-events kevent-nth handle-kevent ] with each ;
M: epoll-mx unix-io-multiplex ( ms mx -- )
M: epoll-mx wait-for-events ( ms mx -- )
dup rot wait-kevent handle-kevents ;

View File

@ -1,8 +1,8 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: alien.c-types kernel io.nonblocking io.unix.backend
io.unix.sockets sequences assocs unix unix.kqueue unix.process
math namespaces classes combinators threads vectors ;
sequences assocs unix unix.kqueue unix.process math namespaces
combinators threads vectors ;
IN: io.unix.kqueue
TUPLE: kqueue-mx events processes ;
@ -18,15 +18,11 @@ TUPLE: kqueue-mx events processes ;
H{ } clone over set-kqueue-mx-processes
max-events "kevent" <c-array> over set-kqueue-mx-events ;
: io-task-filter ( task -- n )
class {
{ read-task [ EVFILT_READ ] }
{ accept-task [ EVFILT_READ ] }
{ receive-task [ EVFILT_READ ] }
{ write-task [ EVFILT_WRITE ] }
{ connect-task [ EVFILT_WRITE ] }
{ send-task [ EVFILT_WRITE ] }
} case ;
GENERIC: io-task-filter ( task -- n )
M: input-task io-task-filter drop EVFILT_READ ;
M: output-task io-task-filter drop EVFILT_WRITE ;
: make-kevent ( task flags -- event )
"kevent" <c-object>
@ -70,7 +66,7 @@ M: kqueue-mx unregister-io-task ( task mx -- )
: handle-kevents ( mx n -- )
[ over kqueue-mx-events kevent-nth handle-kevent ] with each ;
M: kqueue-mx unix-io-multiplex ( ms mx -- )
M: kqueue-mx wait-for-events ( ms mx -- )
swap make-timespec dupd wait-kevent handle-kevents ;
: make-proc-kevent ( pid -- kevent )

View File

@ -40,7 +40,7 @@ TUPLE: select-mx read-fdset write-fdset ;
write-fdset/tasks tuck init-fdset
f ;
M: select-mx unix-io-multiplex ( ms mx -- )
M: select-mx wait-for-events ( ms mx -- )
swap >r FD_SETSIZE over init-fdsets r> make-timeval
select multiplexer-error
dup read-fdset/tasks pick handle-fdset

View File

@ -1,4 +1,4 @@
! Copyright (C) 2004, 2007 Slava Pestov, Ivan Tikhonov.
! Copyright (C) 2004, 2008 Slava Pestov, Ivan Tikhonov.
! See http://factorcode.org/license.txt for BSD license.
! We need to fiddle with the exact search order here, since
@ -34,14 +34,12 @@ M: unix-io addrinfo-error ( n -- )
TUPLE: connect-task ;
: <connect-task> ( port continuation -- task )
connect-task <io-task> ;
connect-task <output-task> ;
M: connect-task do-io-task
io-task-port dup port-handle f 0 write
0 < [ defer-error ] [ drop t ] if ;
M: connect-task io-task-container drop mx-writes ;
: wait-to-connect ( port -- )
[ <connect-task> add-io-task stop ] callcc0 drop ;
@ -68,9 +66,7 @@ USE: unix
TUPLE: accept-task ;
: <accept-task> ( port continuation -- task )
accept-task <io-task> ;
M: accept-task io-task-container drop mx-reads ;
accept-task <input-task> ;
: accept-sockaddr ( port -- fd sockaddr )
dup port-handle swap server-port-addr sockaddr-type
@ -101,7 +97,6 @@ M: unix-io <server> ( addrspec -- stream )
[
SOCK_STREAM server-fd
dup 10 listen zero? [ dup close (io-error) ] unless
f <port>
] keep <server-port> ;
M: unix-io accept ( server -- client )
@ -113,7 +108,7 @@ M: unix-io accept ( server -- client )
! Datagram sockets - UDP and Unix domain
M: unix-io <datagram>
[ SOCK_DGRAM server-fd f <port> ] keep <datagram-port> ;
[ SOCK_DGRAM server-fd ] keep <datagram-port> ;
SYMBOL: receive-buffer
@ -139,7 +134,7 @@ packet-size <byte-array> receive-buffer set-global
TUPLE: receive-task ;
: <receive-task> ( stream continuation -- task )
receive-task <io-task> ;
receive-task <input-task> ;
M: receive-task do-io-task
io-task-port
@ -152,8 +147,6 @@ M: receive-task do-io-task
2drop defer-error
] if ;
M: receive-task io-task-container drop mx-reads ;
: wait-receive ( stream -- )
[ <receive-task> add-io-task stop ] callcc0 drop ;
@ -170,7 +163,7 @@ M: unix-io receive ( datagram -- packet addrspec )
TUPLE: send-task packet sockaddr len ;
: <send-task> ( packet sockaddr len stream continuation -- task )
send-task <io-task> [
send-task <output-task> [
{
set-send-task-packet
set-send-task-sockaddr
@ -185,8 +178,6 @@ M: send-task do-io-task
[ send-task-len do-send ] keep
swap 0 < [ io-task-port defer-error ] [ drop t ] if ;
M: send-task io-task-container drop mx-writes ;
: wait-send ( packet sockaddr len stream -- )
[ <send-task> add-io-task stop ] callcc0 2drop 2drop ;

View File

@ -38,7 +38,7 @@ M: windows-ce-io <server> ( addrspec -- duplex-stream )
[
windows.winsock:SOCK_STREAM server-fd
dup listen-on-socket
<win32-socket> f <port>
<win32-socket>
] keep <server-port> ;
M: windows-ce-io accept ( server -- client )
@ -58,7 +58,7 @@ M: windows-ce-io accept ( server -- client )
M: windows-ce-io <datagram> ( addrspec -- datagram )
[
windows.winsock:SOCK_DGRAM server-fd <win32-socket> f <port>
windows.winsock:SOCK_DGRAM server-fd <win32-socket>
] keep <datagram-port> ;
: packet-size 65536 ; inline

View File

@ -149,7 +149,7 @@ M: windows-nt-io <server> ( addrspec -- server )
[
SOCK_STREAM server-fd dup listen-on-socket
dup add-completion
<win32-socket> f <port>
<win32-socket>
] keep <server-port>
] with-destructors ;
@ -158,7 +158,7 @@ M: windows-nt-io <datagram> ( addrspec -- datagram )
[
SOCK_DGRAM server-fd
dup add-completion
<win32-socket> f <port>
<win32-socket>
] keep <datagram-port>
] with-destructors ;