Cleaning up Unix I/O

db4
Slava Pestov 2008-01-18 18:18:54 -05:00
parent 28332bcdf4
commit 588253dfe3
4 changed files with 188 additions and 85 deletions

View File

@ -1,21 +1,24 @@
! Copyright (C) 2004, 2007 Slava Pestov. ! Copyright (C) 2004, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: alien bit-arrays generic assocs io kernel USING: alien generic assocs kernel kernel.private math
kernel.private math io.nonblocking sequences strings structs io.nonblocking sequences strings structs sbufs threads unix
sbufs threads unix vectors io.buffers io.backend vectors io.buffers io.backend io.streams.duplex math.parser
io.streams.duplex math.parser continuations system libc ; continuations system libc qualified namespaces ;
QUALIFIED: io
IN: io.unix.backend IN: io.unix.backend
! Multiplexer protocol
SYMBOL: unix-io-backend
HOOK: init-unix-io unix-io-backend ( -- )
HOOK: register-io-task unix-io-backend ( task -- )
HOOK: unregister-io-task unix-io-backend ( task -- )
HOOK: unix-io-multiplex unix-io-backend ( timeval -- )
TUPLE: unix-io ; TUPLE: unix-io ;
! We want namespaces::bind to shadow the bind system call from
! unix
USING: namespaces ;
! Global variables ! Global variables
SYMBOL: read-fdset
SYMBOL: read-tasks SYMBOL: read-tasks
SYMBOL: write-fdset
SYMBOL: write-tasks SYMBOL: write-tasks
! Some general stuff ! Some general stuff
@ -53,9 +56,9 @@ M: integer close-handle ( fd -- )
! port to finish I/O ! port to finish I/O
TUPLE: io-task port callbacks ; TUPLE: io-task port callbacks ;
: <io-task> ( port class -- task ) : <io-task> ( port continuation class -- task )
>r V{ } clone io-task construct-boa >r 1vector io-task construct-boa r> construct-delegate ;
{ set-delegate } r> construct ; inline inline
! Multiplexer ! Multiplexer
GENERIC: do-io-task ( task -- ? ) GENERIC: do-io-task ( task -- ? )
@ -63,58 +66,30 @@ GENERIC: task-container ( task -- vector )
: io-task-fd io-task-port port-handle ; : io-task-fd io-task-port port-handle ;
: add-io-task ( callback task -- ) : check-io-task ( task -- )
[ io-task-callbacks push ] keep dup io-task-fd swap task-container at [
dup io-task-fd over task-container 2dup at [
"Cannot perform multiple reads from the same port" throw "Cannot perform multiple reads from the same port" throw
] when set-at ; ] when ;
: add-io-task ( task -- )
dup check-io-task
dup register-io-task
dup io-task-fd over task-container set-at ;
: remove-io-task ( task -- ) : remove-io-task ( task -- )
dup io-task-fd swap task-container delete-at ; dup io-task-fd over task-container delete-at
unregister-io-task ;
: pop-callbacks ( task -- ) : pop-callbacks ( task -- )
dup io-task-callbacks swap remove-io-task dup remove-io-task
[ schedule-thread ] each ; io-task-callbacks [ schedule-thread ] each ;
: handle-fd ( task -- ) : handle-fd ( task -- )
dup io-task-port touch-port dup io-task-port touch-port
dup do-io-task [ pop-callbacks ] [ drop ] if ; dup do-io-task [ pop-callbacks ] [ drop ] if ;
: handle-fdset ( fdset tasks -- ) : handle-timeout ( task -- )
swap [ "Timeout" over io-task-port report-error pop-callbacks ;
swap dup io-task-port timeout? [
dup io-task-port "Timeout" swap report-error
nip pop-callbacks
] [
tuck io-task-fd swap nth
[ handle-fd ] [ drop ] if
] if drop
] curry assoc-each ;
: init-fdset ( fdset tasks -- )
swap dup clear-bits
[ >r drop t swap r> set-nth ] curry assoc-each ;
: read-fdset/tasks
read-fdset get-global read-tasks get-global ;
: write-fdset/tasks
write-fdset get-global write-tasks get-global ;
: init-fdsets ( -- read write except )
read-fdset/tasks dupd init-fdset
write-fdset/tasks dupd init-fdset
f ;
: (io-multiplex) ( ms -- )
>r FD_SETSIZE init-fdsets r> make-timeval select 0 < [
err_no ignorable-error? [ (io-error) ] unless
] when ;
M: unix-io io-multiplex ( ms -- )
(io-multiplex)
read-fdset/tasks handle-fdset
write-fdset/tasks handle-fdset ;
! Readers ! Readers
: reader-eof ( reader -- ) : reader-eof ( reader -- )
@ -137,17 +112,18 @@ M: unix-io io-multiplex ( ms -- )
TUPLE: read-task ; TUPLE: read-task ;
: <read-task> ( port -- task ) read-task <io-task> ; : <read-task> ( port continuation -- task )
read-task <io-task> ;
M: read-task do-io-task M: read-task do-io-task
io-task-port dup refill io-task-port dup refill
[ [ reader-eof ] [ drop ] if ] keep ; [ [ reader-eof ] [ drop ] if ] keep ;
M: read-task task-container drop read-tasks get-global ; M: read-task task-container
drop read-tasks get-global ;
M: input-port (wait-to-read) M: input-port (wait-to-read)
[ swap <read-task> add-io-task stop ] callcc0 [ <read-task> add-io-task stop ] callcc0 pending-error ;
pending-error ;
! Writers ! Writers
: write-step ( port -- ? ) : write-step ( port -- ? )
@ -156,35 +132,34 @@ M: input-port (wait-to-read)
TUPLE: write-task ; TUPLE: write-task ;
: <write-task> ( port -- task ) write-task <io-task> ; : <write-task> ( port continuation -- task )
write-task <io-task> ;
M: write-task do-io-task M: write-task do-io-task
io-task-port dup buffer-empty? over port-error or io-task-port dup buffer-empty? over port-error or
[ 0 swap buffer-reset t ] [ write-step ] if ; [ 0 swap buffer-reset t ] [ write-step ] if ;
M: write-task task-container drop write-tasks get-global ; M: write-task task-container
drop write-tasks get-global ;
: add-write-io-task ( callback task -- ) : add-write-io-task ( port continuation -- )
dup io-task-fd write-tasks get-global at over port-handle write-tasks get-global at
[ io-task-callbacks push ] [ add-io-task ] ?if ; [ io-task-callbacks push drop ]
[ <write-task> add-io-task ] if* ;
: (wait-to-write) ( port -- ) : (wait-to-write) ( port -- )
[ swap <write-task> add-write-io-task stop ] callcc0 drop ; [ add-write-io-task stop ] callcc0 drop ;
M: port port-flush ( port -- ) M: port port-flush ( port -- )
dup buffer-empty? [ drop ] [ (wait-to-write) ] if ; dup buffer-empty? [ drop ] [ (wait-to-write) ] if ;
USE: io M: unix-io io-multiplex ( ms -- )
make-timeval unix-io-multiplex ;
M: unix-io init-io ( -- ) M: unix-io init-io ( -- )
#! Should only be called on startup. Calling this at any H{ } clone read-tasks set-global
#! other time can have unintended consequences. H{ } clone write-tasks set-global
global [ init-unix-io ;
H{ } clone read-tasks set
FD_SETSIZE 8 * <bit-array> read-fdset set
H{ } clone write-tasks set
FD_SETSIZE 8 * <bit-array> write-fdset set
] bind ;
M: unix-io init-stdio ( -- ) M: unix-io init-stdio ( -- )
0 1 handle>duplex-stream stdio set-global ; 0 1 handle>duplex-stream io:stdio set-global ;

View File

@ -0,0 +1,53 @@
! Copyright (C) 2004, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: alien.syntax kernel io.nonblocking io.unix.backend
bit-arrays sequences assocs unix math namespaces ;
IN: io.unix.backend.select
TUPLE: unix-select-io ;
! Global variables
SYMBOL: read-fdset
SYMBOL: write-fdset
M: unix-select-io init-unix-io ( -- )
FD_SETSIZE 8 * <bit-array> read-fdset set-global
FD_SETSIZE 8 * <bit-array> write-fdset set-global ;
: handle-fdset ( fdset tasks -- )
swap [
swap dup io-task-port timeout? [
nip handle-timeout
] [
tuck io-task-fd swap nth
[ handle-fd ] [ drop ] if
] if drop
] curry assoc-each ;
: init-fdset ( fdset tasks -- )
swap dup clear-bits
[ >r drop t swap r> set-nth ] curry assoc-each ;
: read-fdset/tasks
read-fdset get-global read-tasks get-global ;
: write-fdset/tasks
write-fdset get-global write-tasks get-global ;
: init-fdsets ( -- read write except )
read-fdset/tasks dupd init-fdset
write-fdset/tasks dupd init-fdset
f ;
M: unix-select-io register-io-task ( task -- ) drop ;
M: unix-select-io unregister-io-task ( task -- ) drop ;
M: unix-select-io unix-io-multiplex ( timeval -- )
>r FD_SETSIZE init-fdsets r> select 0 < [
err_no ignorable-error? [ (io-error) ] unless
] when
read-fdset/tasks handle-fdset
write-fdset/tasks handle-fdset ;
T{ unix-select-io } unix-io-backend set-global

View File

@ -33,7 +33,8 @@ M: unix-io addrinfo-error ( n -- )
TUPLE: connect-task ; TUPLE: connect-task ;
: <connect-task> ( port -- task ) connect-task <io-task> ; : <connect-task> ( port continuation -- task )
connect-task <io-task> ;
M: connect-task do-io-task M: connect-task do-io-task
io-task-port dup port-handle f 0 write io-task-port dup port-handle f 0 write
@ -42,7 +43,7 @@ M: connect-task do-io-task
M: connect-task task-container drop write-tasks get-global ; M: connect-task task-container drop write-tasks get-global ;
: wait-to-connect ( port -- ) : wait-to-connect ( port -- )
[ swap <connect-task> add-io-task stop ] callcc0 drop ; [ <connect-task> add-io-task stop ] callcc0 drop ;
M: unix-io (client) ( addrspec -- stream ) M: unix-io (client) ( addrspec -- stream )
dup make-sockaddr/size >r >r dup make-sockaddr/size >r >r
@ -66,7 +67,8 @@ USE: unix
TUPLE: accept-task ; TUPLE: accept-task ;
: <accept-task> ( port -- task ) accept-task <io-task> ; : <accept-task> ( port continuation -- task )
accept-task <io-task> ;
M: accept-task task-container drop read-tasks get ; M: accept-task task-container drop read-tasks get ;
@ -85,7 +87,7 @@ M: accept-task do-io-task
over 0 >= [ do-accept t ] [ 2drop defer-error ] if ; over 0 >= [ do-accept t ] [ 2drop defer-error ] if ;
: wait-to-accept ( server -- ) : wait-to-accept ( server -- )
[ swap <accept-task> add-io-task stop ] callcc0 drop ; [ <accept-task> add-io-task stop ] callcc0 drop ;
USE: io.sockets USE: io.sockets
@ -136,7 +138,8 @@ packet-size <byte-array> receive-buffer set-global
TUPLE: receive-task ; TUPLE: receive-task ;
: <receive-task> ( stream -- task ) receive-task <io-task> ; : <receive-task> ( stream continuation -- task )
receive-task <io-task> ;
M: receive-task do-io-task M: receive-task do-io-task
io-task-port io-task-port
@ -152,7 +155,7 @@ M: receive-task do-io-task
M: receive-task task-container drop read-tasks get ; M: receive-task task-container drop read-tasks get ;
: wait-receive ( stream -- ) : wait-receive ( stream -- )
[ swap <receive-task> add-io-task stop ] callcc0 drop ; [ <receive-task> add-io-task stop ] callcc0 drop ;
M: unix-io receive ( datagram -- packet addrspec ) M: unix-io receive ( datagram -- packet addrspec )
dup check-datagram-port dup check-datagram-port
@ -166,7 +169,7 @@ M: unix-io receive ( datagram -- packet addrspec )
TUPLE: send-task packet sockaddr len ; TUPLE: send-task packet sockaddr len ;
: <send-task> ( packet sockaddr len port -- task ) : <send-task> ( packet sockaddr len stream continuation -- task )
send-task <io-task> [ send-task <io-task> [
{ {
set-send-task-packet set-send-task-packet
@ -185,8 +188,7 @@ M: send-task do-io-task
M: send-task task-container drop write-tasks get ; M: send-task task-container drop write-tasks get ;
: wait-send ( packet sockaddr len stream -- ) : wait-send ( packet sockaddr len stream -- )
[ >r <send-task> r> swap add-io-task stop ] callcc0 [ <send-task> add-io-task stop ] callcc0 2drop 2drop ;
2drop 2drop ;
M: unix-io send ( packet addrspec datagram -- ) M: unix-io send ( packet addrspec datagram -- )
3dup check-datagram-send 3dup check-datagram-send

View File

@ -0,0 +1,73 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: alien.syntax ;
IN: unix.kqueue
FUNCTION: int kqueue ( ) ;
FUNCTION: int kevent ( int kq, kevent* changelist, int nchanges, kevent* eventlist, int nevents, timespec* timeout ) ;
C-STRUCT: kevent
{ "ulong" "ident" } ! identifier for this event
{ "short" "filter" } ! filter for event
{ "ushort" "flags" } ! action flags for kqueue
{ "uint" "fflags" } ! filter flag value
{ "long" "data" } ! filter data value
{ "void*" "udata" } ! opaque user data identifier
;
: EVFILT_READ -1 ; inline
: EVFILT_WRITE -2 ; inline
: EVFILT_AIO -3 ; inline ! attached to aio requests
: EVFILT_VNODE -4 ; inline ! attached to vnodes
: EVFILT_PROC -5 ; inline ! attached to struct proc
: EVFILT_SIGNAL -6 ; inline ! attached to struct proc
: EVFILT_TIMER -7 ; inline ! timers
: EVFILT_MACHPORT -8 ; inline ! Mach ports
: EVFILT_FS -9 ; inline ! Filesystem events
! actions
: EV_ADD HEX: 1 ; inline ! add event to kq (implies enable)
: EV_DELETE HEX: 2 ; inline ! delete event from kq
: EV_ENABLE HEX: 4 ; inline ! enable event
: EV_DISABLE HEX: 8 ; inline ! disable event (not reported)
! flags
: EV_ONESHOT HEX: 10 ; inline ! only report one occurrence
: EV_CLEAR HEX: 20 ; inline ! clear event state after reporting
: EV_SYSFLAGS HEX: f000 ; inline ! reserved by system
: EV_FLAG0 HEX: 1000 ; inline ! filter-specific flag
: EV_FLAG1 HEX: 2000 ; inline ! filter-specific flag
! returned values
: EV_EOF HEX: 8000 ; inline ! EOF detected
: EV_ERROR HEX: 4000 ; inline ! error, data contains errno
: EV_POLL EV_FLAG0 ; inline
: EV_OOBAND EV_FLAG1 ; inline
: NOTE_LOWAT HEX: 00000001 ; inline ! low water mark
: NOTE_DELETE HEX: 00000001 ; inline ! vnode was removed
: NOTE_WRITE HEX: 00000002 ; inline ! data contents changed
: NOTE_EXTEND HEX: 00000004 ; inline ! size increased
: NOTE_ATTRIB HEX: 00000008 ; inline ! attributes changed
: NOTE_LINK HEX: 00000010 ; inline ! link count changed
: NOTE_RENAME HEX: 00000020 ; inline ! vnode was renamed
: NOTE_REVOKE HEX: 00000040 ; inline ! vnode access was revoked
: NOTE_EXIT HEX: 80000000 ; inline ! process exited
: NOTE_FORK HEX: 40000000 ; inline ! process forked
: NOTE_EXEC HEX: 20000000 ; inline ! process exec'd
: NOTE_PCTRLMASK HEX: f0000000 ; inline ! mask for hint bits
: NOTE_PDATAMASK HEX: 000fffff ; inline ! mask for pid
: NOTE_SECONDS HEX: 00000001 ; inline ! data is seconds
: NOTE_USECONDS HEX: 00000002 ; inline ! data is microseconds
: NOTE_NSECONDS HEX: 00000004 ; inline ! data is nanoseconds
: NOTE_ABSOLUTE HEX: 00000008 ; inline ! absolute timeout
: NOTE_TRACK HEX: 00000001 ; inline ! follow across forks
: NOTE_TRACKERR HEX: 00000002 ; inline ! could not track child
: NOTE_CHILD HEX: 00000004 ; inline ! am a child process