From 588253dfe3f9c042c1522b2b8174236b63f6ad97 Mon Sep 17 00:00:00 2001
From: Slava Pestov <slava@factorcode.org>
Date: Fri, 18 Jan 2008 18:18:54 -0500
Subject: [PATCH] Cleaning up Unix I/O

---
 extra/io/unix/backend/backend.factor       | 127 +++++++++------------
 extra/io/unix/backend/select/select.factor |  53 +++++++++
 extra/io/unix/sockets/sockets.factor       |  20 ++--
 extra/unix/kqueue/kqueue.factor            |  73 ++++++++++++
 4 files changed, 188 insertions(+), 85 deletions(-)
 create mode 100644 extra/io/unix/backend/select/select.factor
 create mode 100644 extra/unix/kqueue/kqueue.factor

diff --git a/extra/io/unix/backend/backend.factor b/extra/io/unix/backend/backend.factor
index 3522a2218b..ec73a5395e 100755
--- a/extra/io/unix/backend/backend.factor
+++ b/extra/io/unix/backend/backend.factor
@@ -1,21 +1,24 @@
-! Copyright (C) 2004, 2007 Slava Pestov.
+! Copyright (C) 2004, 2008 Slava Pestov.
 ! See http://factorcode.org/license.txt for BSD license.
-USING: alien bit-arrays generic assocs io kernel
-kernel.private math io.nonblocking sequences strings structs
-sbufs threads unix vectors io.buffers io.backend
-io.streams.duplex math.parser continuations system libc ;
+USING: alien generic assocs kernel kernel.private math
+io.nonblocking sequences strings structs sbufs threads unix
+vectors io.buffers io.backend io.streams.duplex math.parser
+continuations system libc qualified namespaces ;
+QUALIFIED: io
 IN: io.unix.backend
 
+! Multiplexer protocol
+SYMBOL: unix-io-backend
+
+HOOK: init-unix-io unix-io-backend ( -- )
+HOOK: register-io-task unix-io-backend ( task -- )
+HOOK: unregister-io-task unix-io-backend ( task -- )
+HOOK: unix-io-multiplex unix-io-backend ( timeval -- )
+
 TUPLE: unix-io ;
 
-! We want namespaces::bind to shadow the bind system call from
-! unix
-USING: namespaces ;
-
 ! Global variables
-SYMBOL: read-fdset
 SYMBOL: read-tasks
-SYMBOL: write-fdset
 SYMBOL: write-tasks
 
 ! Some general stuff
@@ -53,9 +56,9 @@ M: integer close-handle ( fd -- )
 ! port to finish I/O
 TUPLE: io-task port callbacks ;
 
-: <io-task> ( port class -- task )
-    >r V{ } clone io-task construct-boa
-    { set-delegate } r> construct ; inline
+: <io-task> ( port continuation class -- task )
+    >r 1vector io-task construct-boa r> construct-delegate ;
+    inline
 
 ! Multiplexer
 GENERIC: do-io-task ( task -- ? )
@@ -63,58 +66,30 @@ GENERIC: task-container ( task -- vector )
 
 : io-task-fd io-task-port port-handle ;
 
-: add-io-task ( callback task -- )
-    [ io-task-callbacks push ] keep
-    dup io-task-fd over task-container 2dup at [
+: check-io-task ( task -- )
+    dup io-task-fd swap task-container at [
         "Cannot perform multiple reads from the same port" throw
-    ] when set-at ;
+    ] when ;
+
+: add-io-task ( task -- )
+    dup check-io-task
+    dup register-io-task
+    dup io-task-fd over task-container set-at ;
 
 : remove-io-task ( task -- )
-    dup io-task-fd swap task-container delete-at ;
+    dup io-task-fd over task-container delete-at
+    unregister-io-task ;
 
 : pop-callbacks ( task -- )
-    dup io-task-callbacks swap remove-io-task
-    [ schedule-thread ] each ;
+    dup remove-io-task
+    io-task-callbacks [ schedule-thread ] each ;
 
 : handle-fd ( task -- )
     dup io-task-port touch-port
     dup do-io-task [ pop-callbacks ] [ drop ] if ;
 
-: handle-fdset ( fdset tasks -- )
-    swap [
-        swap dup io-task-port timeout? [
-            dup io-task-port "Timeout" swap report-error
-            nip pop-callbacks
-        ] [
-            tuck io-task-fd swap nth
-            [ handle-fd ] [ drop ] if
-        ] if drop
-    ] curry assoc-each ;
-
-: init-fdset ( fdset tasks -- )
-    swap dup clear-bits
-    [ >r drop t swap r> set-nth ] curry assoc-each ;
-
-: read-fdset/tasks
-    read-fdset get-global read-tasks get-global ;
-
-: write-fdset/tasks
-    write-fdset get-global write-tasks get-global ;
-
-: init-fdsets ( -- read write except )
-    read-fdset/tasks dupd init-fdset
-    write-fdset/tasks dupd init-fdset
-    f ;
-
-: (io-multiplex) ( ms -- )
-    >r FD_SETSIZE init-fdsets r> make-timeval select 0 < [
-        err_no ignorable-error? [ (io-error) ] unless
-    ] when ;
-
-M: unix-io io-multiplex ( ms -- )
-    (io-multiplex)
-    read-fdset/tasks handle-fdset
-    write-fdset/tasks handle-fdset ;
+: handle-timeout ( task -- )
+    "Timeout" over io-task-port report-error pop-callbacks ;
 
 ! Readers
 : reader-eof ( reader -- )
@@ -137,17 +112,18 @@ M: unix-io io-multiplex ( ms -- )
 
 TUPLE: read-task ;
 
-: <read-task> ( port -- task ) read-task <io-task> ;
+: <read-task> ( port continuation -- task )
+    read-task <io-task> ;
 
 M: read-task do-io-task
     io-task-port dup refill
     [ [ reader-eof ] [ drop ] if ] keep ;
 
-M: read-task task-container drop read-tasks get-global ;
+M: read-task task-container
+    drop read-tasks get-global ;
 
 M: input-port (wait-to-read)
-    [ swap <read-task> add-io-task stop ] callcc0
-    pending-error ;
+    [ <read-task> add-io-task stop ] callcc0 pending-error ;
 
 ! Writers
 : write-step ( port -- ? )
@@ -156,35 +132,34 @@ M: input-port (wait-to-read)
 
 TUPLE: write-task ;
 
-: <write-task> ( port -- task ) write-task <io-task> ;
+: <write-task> ( port continuation -- task )
+    write-task <io-task> ;
 
 M: write-task do-io-task
     io-task-port dup buffer-empty? over port-error or
     [ 0 swap buffer-reset t ] [ write-step ] if ;
 
-M: write-task task-container drop write-tasks get-global ;
+M: write-task task-container
+    drop write-tasks get-global ;
 
-: add-write-io-task ( callback task -- )
-    dup io-task-fd write-tasks get-global at
-    [ io-task-callbacks push ] [ add-io-task ] ?if ;
+: add-write-io-task ( port continuation -- )
+    over port-handle write-tasks get-global at
+    [ io-task-callbacks push drop ]
+    [ <write-task> add-io-task ] if* ;
 
 : (wait-to-write) ( port -- )
-    [ swap <write-task> add-write-io-task stop ] callcc0 drop ;
+    [ add-write-io-task stop ] callcc0 drop ;
 
 M: port port-flush ( port -- )
     dup buffer-empty? [ drop ] [ (wait-to-write) ] if ;
 
-USE: io
+M: unix-io io-multiplex ( ms -- )
+    make-timeval unix-io-multiplex ;
 
 M: unix-io init-io ( -- )
-    #! Should only be called on startup. Calling this at any
-    #! other time can have unintended consequences.
-    global [
-        H{ } clone read-tasks set
-        FD_SETSIZE 8 * <bit-array> read-fdset set
-        H{ } clone write-tasks set
-        FD_SETSIZE 8 * <bit-array> write-fdset set
-    ] bind ;
+    H{ } clone read-tasks set-global
+    H{ } clone write-tasks set-global
+    init-unix-io ;
 
 M: unix-io init-stdio ( -- )
-    0 1 handle>duplex-stream stdio set-global ;
+    0 1 handle>duplex-stream io:stdio set-global ;
diff --git a/extra/io/unix/backend/select/select.factor b/extra/io/unix/backend/select/select.factor
new file mode 100644
index 0000000000..255010bff6
--- /dev/null
+++ b/extra/io/unix/backend/select/select.factor
@@ -0,0 +1,53 @@
+! Copyright (C) 2004, 2008 Slava Pestov.
+! See http://factorcode.org/license.txt for BSD license.
+USING: alien.syntax kernel io.nonblocking io.unix.backend
+bit-arrays sequences assocs unix math namespaces ;
+IN: io.unix.backend.select
+
+TUPLE: unix-select-io ;
+
+! Global variables
+SYMBOL: read-fdset
+SYMBOL: write-fdset
+
+M: unix-select-io init-unix-io ( -- )
+    FD_SETSIZE 8 * <bit-array> read-fdset set-global
+    FD_SETSIZE 8 * <bit-array> write-fdset set-global ;
+
+: handle-fdset ( fdset tasks -- )
+    swap [
+        swap dup io-task-port timeout? [
+            nip handle-timeout
+        ] [
+            tuck io-task-fd swap nth
+            [ handle-fd ] [ drop ] if
+        ] if drop
+    ] curry assoc-each ;
+
+: init-fdset ( fdset tasks -- )
+    swap dup clear-bits
+    [ >r drop t swap r> set-nth ] curry assoc-each ;
+
+: read-fdset/tasks
+    read-fdset get-global read-tasks get-global ;
+
+: write-fdset/tasks
+    write-fdset get-global write-tasks get-global ;
+
+: init-fdsets ( -- read write except )
+    read-fdset/tasks dupd init-fdset
+    write-fdset/tasks dupd init-fdset
+    f ;
+
+M: unix-select-io register-io-task ( task -- ) drop ;
+
+M: unix-select-io unregister-io-task ( task -- ) drop ;
+
+M: unix-select-io unix-io-multiplex ( timeval -- )
+    >r FD_SETSIZE init-fdsets r> select 0 < [
+        err_no ignorable-error? [ (io-error) ] unless
+    ] when
+    read-fdset/tasks handle-fdset
+    write-fdset/tasks handle-fdset ;
+
+T{ unix-select-io } unix-io-backend set-global
diff --git a/extra/io/unix/sockets/sockets.factor b/extra/io/unix/sockets/sockets.factor
index 0787a1afde..30d3bbd94c 100644
--- a/extra/io/unix/sockets/sockets.factor
+++ b/extra/io/unix/sockets/sockets.factor
@@ -33,7 +33,8 @@ M: unix-io addrinfo-error ( n -- )
 
 TUPLE: connect-task ;
 
-: <connect-task> ( port -- task ) connect-task <io-task> ;
+: <connect-task> ( port continuation -- task )
+    connect-task <io-task> ;
 
 M: connect-task do-io-task
     io-task-port dup port-handle f 0 write
@@ -42,7 +43,7 @@ M: connect-task do-io-task
 M: connect-task task-container drop write-tasks get-global ;
 
 : wait-to-connect ( port -- )
-    [ swap <connect-task> add-io-task stop ] callcc0 drop ;
+    [ <connect-task> add-io-task stop ] callcc0 drop ;
 
 M: unix-io (client) ( addrspec -- stream )
     dup make-sockaddr/size >r >r
@@ -66,7 +67,8 @@ USE: unix
 
 TUPLE: accept-task ;
 
-: <accept-task> ( port -- task ) accept-task <io-task> ;
+: <accept-task> ( port continuation  -- task )
+    accept-task <io-task> ;
 
 M: accept-task task-container drop read-tasks get ;
 
@@ -85,7 +87,7 @@ M: accept-task do-io-task
     over 0 >= [ do-accept t ] [ 2drop defer-error ] if ;
 
 : wait-to-accept ( server -- )
-    [ swap <accept-task> add-io-task stop ] callcc0 drop ;
+    [ <accept-task> add-io-task stop ] callcc0 drop ;
 
 USE: io.sockets
 
@@ -136,7 +138,8 @@ packet-size <byte-array> receive-buffer set-global
 
 TUPLE: receive-task ;
 
-: <receive-task> ( stream -- task ) receive-task <io-task> ;
+: <receive-task> ( stream continuation  -- task )
+    receive-task <io-task> ;
 
 M: receive-task do-io-task
     io-task-port
@@ -152,7 +155,7 @@ M: receive-task do-io-task
 M: receive-task task-container drop read-tasks get ;
 
 : wait-receive ( stream -- )
-    [ swap <receive-task> add-io-task stop ] callcc0 drop ;
+    [ <receive-task> add-io-task stop ] callcc0 drop ;
 
 M: unix-io receive ( datagram -- packet addrspec )
     dup check-datagram-port
@@ -166,7 +169,7 @@ M: unix-io receive ( datagram -- packet addrspec )
 
 TUPLE: send-task packet sockaddr len ;
 
-: <send-task> ( packet sockaddr len port -- task )
+: <send-task> ( packet sockaddr len stream continuation -- task )
     send-task <io-task> [
         {
             set-send-task-packet
@@ -185,8 +188,7 @@ M: send-task do-io-task
 M: send-task task-container drop write-tasks get ;
 
 : wait-send ( packet sockaddr len stream -- )
-    [ >r <send-task> r> swap add-io-task stop ] callcc0
-    2drop 2drop ;
+    [ <send-task> add-io-task stop ] callcc0 2drop 2drop ;
 
 M: unix-io send ( packet addrspec datagram -- )
     3dup check-datagram-send
diff --git a/extra/unix/kqueue/kqueue.factor b/extra/unix/kqueue/kqueue.factor
new file mode 100644
index 0000000000..4e6504470d
--- /dev/null
+++ b/extra/unix/kqueue/kqueue.factor
@@ -0,0 +1,73 @@
+! Copyright (C) 2008 Slava Pestov.
+! See http://factorcode.org/license.txt for BSD license.
+USING: alien.syntax ;
+IN: unix.kqueue
+
+FUNCTION: int kqueue ( ) ;
+
+FUNCTION: int kevent ( int kq, kevent* changelist, int nchanges, kevent* eventlist, int nevents, timespec* timeout ) ;
+
+C-STRUCT: kevent
+    { "ulong"  "ident"  } ! identifier for this event
+    { "short"  "filter" } ! filter for event
+    { "ushort" "flags"  } ! action flags for kqueue
+    { "uint"   "fflags" } ! filter flag value
+    { "long"   "data"   } ! filter data value
+    { "void*"  "udata"  } ! opaque user data identifier
+;
+
+: EVFILT_READ     -1 ; inline
+: EVFILT_WRITE    -2 ; inline
+: EVFILT_AIO      -3 ; inline ! attached to aio requests
+: EVFILT_VNODE    -4 ; inline ! attached to vnodes
+: EVFILT_PROC     -5 ; inline ! attached to struct proc
+: EVFILT_SIGNAL   -6 ; inline ! attached to struct proc
+: EVFILT_TIMER    -7 ; inline ! timers
+: EVFILT_MACHPORT -8 ; inline ! Mach ports
+: EVFILT_FS       -9 ; inline ! Filesystem events
+
+! actions
+: EV_ADD     HEX: 1 ; inline ! add event to kq (implies enable)
+: EV_DELETE  HEX: 2 ; inline ! delete event from kq
+: EV_ENABLE  HEX: 4 ; inline ! enable event
+: EV_DISABLE HEX: 8 ; inline ! disable event (not reported)
+
+! flags
+: EV_ONESHOT HEX: 10 ; inline ! only report one occurrence
+: EV_CLEAR   HEX: 20 ; inline ! clear event state after reporting
+
+: EV_SYSFLAGS HEX: f000 ; inline ! reserved by system
+: EV_FLAG0    HEX: 1000 ; inline ! filter-specific flag
+: EV_FLAG1    HEX: 2000 ; inline ! filter-specific flag
+
+! returned values
+: EV_EOF          HEX: 8000 ; inline ! EOF detected
+: EV_ERROR        HEX: 4000 ; inline ! error, data contains errno
+
+: EV_POLL EV_FLAG0 ; inline
+: EV_OOBAND EV_FLAG1 ; inline
+
+: NOTE_LOWAT      HEX: 00000001 ; inline ! low water mark
+
+: NOTE_DELETE     HEX: 00000001 ; inline ! vnode was removed
+: NOTE_WRITE      HEX: 00000002 ; inline ! data contents changed
+: NOTE_EXTEND     HEX: 00000004 ; inline ! size increased
+: NOTE_ATTRIB     HEX: 00000008 ; inline ! attributes changed
+: NOTE_LINK       HEX: 00000010 ; inline ! link count changed
+: NOTE_RENAME     HEX: 00000020 ; inline ! vnode was renamed
+: NOTE_REVOKE     HEX: 00000040 ; inline ! vnode access was revoked
+
+: NOTE_EXIT       HEX: 80000000 ; inline ! process exited
+: NOTE_FORK       HEX: 40000000 ; inline ! process forked
+: NOTE_EXEC       HEX: 20000000 ; inline ! process exec'd
+: NOTE_PCTRLMASK  HEX: f0000000 ; inline ! mask for hint bits
+: NOTE_PDATAMASK  HEX: 000fffff ; inline ! mask for pid
+
+: NOTE_SECONDS    HEX: 00000001 ; inline ! data is seconds
+: NOTE_USECONDS   HEX: 00000002 ; inline ! data is microseconds
+: NOTE_NSECONDS   HEX: 00000004 ; inline ! data is nanoseconds
+: NOTE_ABSOLUTE   HEX: 00000008 ; inline ! absolute timeout
+
+: NOTE_TRACK      HEX: 00000001 ; inline ! follow across forks
+: NOTE_TRACKERR   HEX: 00000002 ; inline ! could not track child
+: NOTE_CHILD      HEX: 00000004 ; inline ! am a child process