Unix I/O re-working; nested multiplexer support, use kqueue on *bsd to wait for process completion, start Linux epoll support
							parent
							
								
									7db1b072f8
								
							
						
					
					
						commit
						eb5644ad5f
					
				| 
						 | 
				
			
			@ -7,19 +7,46 @@ continuations system libc qualified namespaces ;
 | 
			
		|||
QUALIFIED: io
 | 
			
		||||
IN: io.unix.backend
 | 
			
		||||
 | 
			
		||||
! Multiplexer protocol
 | 
			
		||||
SYMBOL: unix-io-backend
 | 
			
		||||
MIXIN: unix-io
 | 
			
		||||
 | 
			
		||||
HOOK: init-unix-io unix-io-backend ( -- )
 | 
			
		||||
HOOK: register-io-task unix-io-backend ( task -- )
 | 
			
		||||
HOOK: unregister-io-task unix-io-backend ( task -- )
 | 
			
		||||
HOOK: unix-io-multiplex unix-io-backend ( timeval -- )
 | 
			
		||||
! I/O tasks
 | 
			
		||||
TUPLE: io-task port callbacks ;
 | 
			
		||||
 | 
			
		||||
TUPLE: unix-io ;
 | 
			
		||||
: io-task-fd io-task-port port-handle ;
 | 
			
		||||
 | 
			
		||||
! Global variables
 | 
			
		||||
SYMBOL: read-tasks
 | 
			
		||||
SYMBOL: write-tasks
 | 
			
		||||
: <io-task> ( port continuation class -- task )
 | 
			
		||||
    >r 1vector io-task construct-boa r> construct-delegate ;
 | 
			
		||||
    inline
 | 
			
		||||
 | 
			
		||||
GENERIC: do-io-task ( task -- ? )
 | 
			
		||||
GENERIC: io-task-container ( mx task -- hashtable )
 | 
			
		||||
 | 
			
		||||
! I/O multiplexers
 | 
			
		||||
TUPLE: mx fd reads writes ;
 | 
			
		||||
 | 
			
		||||
: <mx> ( -- mx ) f H{ } clone H{ } clone mx construct-boa ;
 | 
			
		||||
 | 
			
		||||
: construct-mx ( class -- obj ) <mx> swap construct-delegate ;
 | 
			
		||||
 | 
			
		||||
GENERIC: register-io-task ( task mx -- )
 | 
			
		||||
GENERIC: unregister-io-task ( task mx -- )
 | 
			
		||||
GENERIC: unix-io-multiplex ( ms mx -- )
 | 
			
		||||
 | 
			
		||||
: fd/container ( task mx -- task fd container )
 | 
			
		||||
    over io-task-container >r dup io-task-fd r> ; inline
 | 
			
		||||
 | 
			
		||||
: check-io-task ( task mx -- )
 | 
			
		||||
    fd/container key? nip [
 | 
			
		||||
        "Cannot perform multiple reads from the same port" throw
 | 
			
		||||
    ] when ;
 | 
			
		||||
 | 
			
		||||
M: mx register-io-task ( task mx -- )
 | 
			
		||||
    2dup check-io-task fd/container set-at ;
 | 
			
		||||
 | 
			
		||||
: add-io-task ( task -- ) mx get-global register-io-task ;
 | 
			
		||||
 | 
			
		||||
M: mx unregister-io-task ( task mx -- )
 | 
			
		||||
    fd/container delete-at drop ;
 | 
			
		||||
 | 
			
		||||
! Some general stuff
 | 
			
		||||
: file-mode OCT: 0666 ;
 | 
			
		||||
| 
						 | 
				
			
			@ -52,43 +79,15 @@ M: integer close-handle ( fd -- )
 | 
			
		|||
    err_no dup ignorable-error?
 | 
			
		||||
    [ 2drop f ] [ strerror swap report-error t ] if ;
 | 
			
		||||
 | 
			
		||||
! Associates a port with a list of continuations waiting on the
 | 
			
		||||
! port to finish I/O
 | 
			
		||||
TUPLE: io-task port callbacks ;
 | 
			
		||||
 | 
			
		||||
: <io-task> ( port continuation class -- task )
 | 
			
		||||
    >r 1vector io-task construct-boa r> construct-delegate ;
 | 
			
		||||
    inline
 | 
			
		||||
 | 
			
		||||
! Multiplexer
 | 
			
		||||
GENERIC: do-io-task ( task -- ? )
 | 
			
		||||
GENERIC: task-container ( task -- vector )
 | 
			
		||||
 | 
			
		||||
: io-task-fd io-task-port port-handle ;
 | 
			
		||||
 | 
			
		||||
: check-io-task ( task -- )
 | 
			
		||||
    dup io-task-fd swap task-container at [
 | 
			
		||||
        "Cannot perform multiple reads from the same port" throw
 | 
			
		||||
    ] when ;
 | 
			
		||||
 | 
			
		||||
: add-io-task ( task -- )
 | 
			
		||||
    dup check-io-task
 | 
			
		||||
    dup register-io-task
 | 
			
		||||
    dup io-task-fd over task-container set-at ;
 | 
			
		||||
 | 
			
		||||
: remove-io-task ( task -- )
 | 
			
		||||
    dup io-task-fd over task-container delete-at
 | 
			
		||||
    unregister-io-task ;
 | 
			
		||||
 | 
			
		||||
: pop-callbacks ( task -- )
 | 
			
		||||
    dup remove-io-task
 | 
			
		||||
: pop-callbacks ( mx task -- )
 | 
			
		||||
    dup rot unregister-io-task
 | 
			
		||||
    io-task-callbacks [ schedule-thread ] each ;
 | 
			
		||||
 | 
			
		||||
: handle-fd ( task -- )
 | 
			
		||||
: handle-io-task ( mx task -- )
 | 
			
		||||
    dup io-task-port touch-port
 | 
			
		||||
    dup do-io-task [ pop-callbacks ] [ drop ] if ;
 | 
			
		||||
    dup do-io-task [ pop-callbacks ] [ 2drop ] if ;
 | 
			
		||||
 | 
			
		||||
: handle-timeout ( task -- )
 | 
			
		||||
: handle-timeout ( mx task -- )
 | 
			
		||||
    "Timeout" over io-task-port report-error pop-callbacks ;
 | 
			
		||||
 | 
			
		||||
! Readers
 | 
			
		||||
| 
						 | 
				
			
			@ -119,8 +118,7 @@ M: read-task do-io-task
 | 
			
		|||
    io-task-port dup refill
 | 
			
		||||
    [ [ reader-eof ] [ drop ] if ] keep ;
 | 
			
		||||
 | 
			
		||||
M: read-task task-container
 | 
			
		||||
    drop read-tasks get-global ;
 | 
			
		||||
M: read-task io-task-container drop mx-reads ;
 | 
			
		||||
 | 
			
		||||
M: input-port (wait-to-read)
 | 
			
		||||
    [ <read-task> add-io-task stop ] callcc0 pending-error ;
 | 
			
		||||
| 
						 | 
				
			
			@ -139,13 +137,12 @@ M: write-task do-io-task
 | 
			
		|||
    io-task-port dup buffer-empty? over port-error or
 | 
			
		||||
    [ 0 swap buffer-reset t ] [ write-step ] if ;
 | 
			
		||||
 | 
			
		||||
M: write-task task-container
 | 
			
		||||
    drop write-tasks get-global ;
 | 
			
		||||
M: write-task io-task-container drop mx-writes ;
 | 
			
		||||
 | 
			
		||||
: add-write-io-task ( port continuation -- )
 | 
			
		||||
    over port-handle write-tasks get-global at
 | 
			
		||||
    over port-handle mx get-global mx-writes at*
 | 
			
		||||
    [ io-task-callbacks push drop ]
 | 
			
		||||
    [ <write-task> add-io-task ] if* ;
 | 
			
		||||
    [ drop <write-task> add-io-task ] if ;
 | 
			
		||||
 | 
			
		||||
: (wait-to-write) ( port -- )
 | 
			
		||||
    [ add-write-io-task stop ] callcc0 drop ;
 | 
			
		||||
| 
						 | 
				
			
			@ -154,16 +151,27 @@ M: port port-flush ( port -- )
 | 
			
		|||
    dup buffer-empty? [ drop ] [ (wait-to-write) ] if ;
 | 
			
		||||
 | 
			
		||||
M: unix-io io-multiplex ( ms -- )
 | 
			
		||||
    unix-io-multiplex ;
 | 
			
		||||
 | 
			
		||||
M: unix-io init-io ( -- )
 | 
			
		||||
    H{ } clone read-tasks set-global
 | 
			
		||||
    H{ } clone write-tasks set-global
 | 
			
		||||
    init-unix-io ;
 | 
			
		||||
    mx get-global unix-io-multiplex ;
 | 
			
		||||
 | 
			
		||||
M: unix-io init-stdio ( -- )
 | 
			
		||||
    0 1 handle>duplex-stream io:stdio set-global
 | 
			
		||||
    2 <writer> io:stderr set-global ;
 | 
			
		||||
 | 
			
		||||
! mx io-task for embedding an fd-based mx inside another mx
 | 
			
		||||
TUPLE: mx-port mx ;
 | 
			
		||||
 | 
			
		||||
: <mx-port> ( mx -- port )
 | 
			
		||||
    dup mx-fd f <port>
 | 
			
		||||
    mx-port over set-port-type
 | 
			
		||||
    { set-mx-port-mx set-delegate } mx-port construct ;
 | 
			
		||||
 | 
			
		||||
TUPLE: mx-task ;
 | 
			
		||||
 | 
			
		||||
: <mx-task> ( port -- task )
 | 
			
		||||
    f io-task construct-boa mx-task construct-delegate ;
 | 
			
		||||
 | 
			
		||||
M: mx-task do-io-task
 | 
			
		||||
    io-task-port mx-port-mx 0 swap unix-io-multiplex f ;
 | 
			
		||||
 | 
			
		||||
: multiplexer-error ( n -- )
 | 
			
		||||
    0 < [ err_no ignorable-error? [ (io-error) ] unless ] when ;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,76 +0,0 @@
 | 
			
		|||
! Copyright (C) 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: alien.c-types kernel io.nonblocking io.unix.backend
 | 
			
		||||
io.unix.sockets sequences assocs unix unix.kqueue math
 | 
			
		||||
namespaces classes combinators ;
 | 
			
		||||
IN: io.unix.backend.kqueue
 | 
			
		||||
 | 
			
		||||
TUPLE: unix-kqueue-io ;
 | 
			
		||||
 | 
			
		||||
! Global variables
 | 
			
		||||
SYMBOL: kqueue-fd
 | 
			
		||||
SYMBOL: kqueue-events
 | 
			
		||||
 | 
			
		||||
: max-events ( -- n )
 | 
			
		||||
    #! We read up to 256 events at a time. This is an arbitrary
 | 
			
		||||
    #! constant...
 | 
			
		||||
    256 ; inline
 | 
			
		||||
 | 
			
		||||
M: unix-kqueue-io init-unix-io ( -- )
 | 
			
		||||
    max-events "kevent" <c-array> kqueue-events set-global
 | 
			
		||||
    kqueue dup io-error kqueue-fd set-global ;
 | 
			
		||||
 | 
			
		||||
: io-task-filter ( task -- n )
 | 
			
		||||
    class {
 | 
			
		||||
        { read-task    [ EVFILT_READ  ] }
 | 
			
		||||
        { accept-task  [ EVFILT_READ  ] }
 | 
			
		||||
        { receive-task [ EVFILT_READ  ] }
 | 
			
		||||
        { write-task   [ EVFILT_WRITE ] }
 | 
			
		||||
        { connect-task [ EVFILT_WRITE ] }
 | 
			
		||||
        { send-task    [ EVFILT_WRITE ] }
 | 
			
		||||
    } case ;
 | 
			
		||||
 | 
			
		||||
: make-kevent ( task -- event )
 | 
			
		||||
    "kevent" <c-object>
 | 
			
		||||
    over io-task-fd over set-kevent-ident
 | 
			
		||||
    swap io-task-filter over set-kevent-filter ;
 | 
			
		||||
 | 
			
		||||
: register-kevent ( task flags -- )
 | 
			
		||||
    >r make-kevent r> over set-kevent-flags
 | 
			
		||||
    kqueue-fd get-global swap 1 f 0 f kevent io-error ;
 | 
			
		||||
 | 
			
		||||
: make-add-kevent ( task -- event )
 | 
			
		||||
    make-kevent
 | 
			
		||||
    EV_ADD over set-kevent-flags ;
 | 
			
		||||
 | 
			
		||||
: make-delete-kevent ( task -- event )
 | 
			
		||||
    make-kevent
 | 
			
		||||
    EV_DELETE over set-kevent-flags ;
 | 
			
		||||
 | 
			
		||||
M: unix-kqueue-io register-io-task ( task -- )
 | 
			
		||||
    EV_ADD EV_ENABLE bitor register-kevent ;
 | 
			
		||||
 | 
			
		||||
M: unix-kqueue-io unregister-io-task ( task -- )
 | 
			
		||||
    EV_DELETE EV_DISABLE bitor register-kevent ;
 | 
			
		||||
 | 
			
		||||
: wait-kevent ( timespec -- n )
 | 
			
		||||
    >r
 | 
			
		||||
    kqueue-fd get-global
 | 
			
		||||
    f 0 kqueue-events get-global max-events
 | 
			
		||||
    r> kevent dup multiplexer-error ;
 | 
			
		||||
 | 
			
		||||
: kevent-task ( kevent -- task )
 | 
			
		||||
    dup kevent-ident swap kevent-filter {
 | 
			
		||||
        { [ dup EVFILT_READ = ] [ read-tasks ] }
 | 
			
		||||
        { [ dup EVFILT_WRITE = ] [ write-tasks ] }
 | 
			
		||||
    } cond nip get at ;
 | 
			
		||||
 | 
			
		||||
: handle-kevents ( n eventlist -- )
 | 
			
		||||
    [ kevent-nth kevent-task handle-fd ] curry each ;
 | 
			
		||||
 | 
			
		||||
M: unix-kqueue-io unix-io-multiplex ( ms -- )
 | 
			
		||||
    make-timespec
 | 
			
		||||
    wait-kevent
 | 
			
		||||
    kqueue-events get-global handle-kevents ;
 | 
			
		||||
 | 
			
		||||
T{ unix-kqueue-io } unix-io-backend set-global
 | 
			
		||||
| 
						 | 
				
			
			@ -1,52 +0,0 @@
 | 
			
		|||
! Copyright (C) 2004, 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: alien.syntax kernel io.nonblocking io.unix.backend
 | 
			
		||||
bit-arrays sequences assocs unix math namespaces structs ;
 | 
			
		||||
IN: io.unix.backend.select
 | 
			
		||||
 | 
			
		||||
TUPLE: unix-select-io ;
 | 
			
		||||
 | 
			
		||||
! Global variables
 | 
			
		||||
SYMBOL: read-fdset
 | 
			
		||||
SYMBOL: write-fdset
 | 
			
		||||
 | 
			
		||||
M: unix-select-io init-unix-io ( -- )
 | 
			
		||||
    FD_SETSIZE 8 * <bit-array> read-fdset set-global
 | 
			
		||||
    FD_SETSIZE 8 * <bit-array> write-fdset set-global ;
 | 
			
		||||
 | 
			
		||||
: handle-fdset ( fdset tasks -- )
 | 
			
		||||
    swap [
 | 
			
		||||
        swap dup io-task-port timeout? [
 | 
			
		||||
            nip handle-timeout
 | 
			
		||||
        ] [
 | 
			
		||||
            tuck io-task-fd swap nth
 | 
			
		||||
            [ handle-fd ] [ drop ] if
 | 
			
		||||
        ] if drop
 | 
			
		||||
    ] curry assoc-each ;
 | 
			
		||||
 | 
			
		||||
: init-fdset ( fdset tasks -- )
 | 
			
		||||
    swap dup clear-bits
 | 
			
		||||
    [ >r drop t swap r> set-nth ] curry assoc-each ;
 | 
			
		||||
 | 
			
		||||
: read-fdset/tasks
 | 
			
		||||
    read-fdset get-global read-tasks get-global ;
 | 
			
		||||
 | 
			
		||||
: write-fdset/tasks
 | 
			
		||||
    write-fdset get-global write-tasks get-global ;
 | 
			
		||||
 | 
			
		||||
: init-fdsets ( -- read write except )
 | 
			
		||||
    read-fdset/tasks dupd init-fdset
 | 
			
		||||
    write-fdset/tasks dupd init-fdset
 | 
			
		||||
    f ;
 | 
			
		||||
 | 
			
		||||
M: unix-select-io register-io-task ( task -- ) drop ;
 | 
			
		||||
 | 
			
		||||
M: unix-select-io unregister-io-task ( task -- ) drop ;
 | 
			
		||||
 | 
			
		||||
M: unix-select-io unix-io-multiplex ( timeval -- )
 | 
			
		||||
    make-timeval >r FD_SETSIZE init-fdsets r>
 | 
			
		||||
    select multiplexer-error
 | 
			
		||||
    read-fdset/tasks handle-fdset
 | 
			
		||||
    write-fdset/tasks handle-fdset ;
 | 
			
		||||
 | 
			
		||||
T{ unix-select-io } unix-io-backend set-global
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,29 @@
 | 
			
		|||
! Copyright (C) 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
IN: io.unix.bsd
 | 
			
		||||
USING: io.backend io.unix.backend io.unix.kqueue io.unix.select
 | 
			
		||||
io.unix.launcher namespaces kernel assocs threads continuations
 | 
			
		||||
;
 | 
			
		||||
 | 
			
		||||
! On *BSD and Mac OS X, we use select() for the top-level
 | 
			
		||||
! multiplexer, and we hang a kqueue off of it but file change
 | 
			
		||||
! notification and process exit notification.
 | 
			
		||||
 | 
			
		||||
! kqueue is buggy with files and ptys so we can't use it as the
 | 
			
		||||
! main multiplexer.
 | 
			
		||||
 | 
			
		||||
TUPLE: bsd-io ;
 | 
			
		||||
 | 
			
		||||
INSTANCE: bsd-io unix-io
 | 
			
		||||
 | 
			
		||||
M: bsd-io init-io ( -- )
 | 
			
		||||
    <select-mx> mx set-global
 | 
			
		||||
    <kqueue-mx> kqueue-mx set-global
 | 
			
		||||
    kqueue-mx get-global <mx-port> <mx-task> dup io-task-fd
 | 
			
		||||
    2dup mx get-global mx-reads set-at
 | 
			
		||||
    mx get-global mx-writes set-at ;
 | 
			
		||||
 | 
			
		||||
M: bsd-io wait-for-process ( pid -- status )
 | 
			
		||||
    [ kqueue-mx get-global add-pid-task stop ] curry callcc1 ;
 | 
			
		||||
 | 
			
		||||
T{ bsd-io } io-backend set-global
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,61 @@
 | 
			
		|||
! Copyright (C) 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: alien.c-types kernel io.nonblocking io.unix.backend
 | 
			
		||||
bit-arrays sequences assocs unix math namespaces structs ;
 | 
			
		||||
IN: io.unix.epoll
 | 
			
		||||
 | 
			
		||||
TUPLE: epoll-mx events ;
 | 
			
		||||
 | 
			
		||||
: max-events ( -- n )
 | 
			
		||||
    #! We read up to 256 events at a time. This is an arbitrary
 | 
			
		||||
    #! constant...
 | 
			
		||||
    256 ; inline
 | 
			
		||||
 | 
			
		||||
: <epoll-mx> ( -- mx )
 | 
			
		||||
    epoll-mx construct-mx
 | 
			
		||||
    max-events epoll_create dup io-error over set-mx-fd
 | 
			
		||||
    max-events "epoll-event" <c-array> over set-epoll-mx-events ;
 | 
			
		||||
 | 
			
		||||
: io-task-filter ( task -- n )
 | 
			
		||||
    class {
 | 
			
		||||
        { read-task    [ EVFILT_READ  ] }
 | 
			
		||||
        { accept-task  [ EVFILT_READ  ] }
 | 
			
		||||
        { receive-task [ EVFILT_READ  ] }
 | 
			
		||||
        { write-task   [ EVFILT_WRITE ] }
 | 
			
		||||
        { connect-task [ EVFILT_WRITE ] }
 | 
			
		||||
        { send-task    [ EVFILT_WRITE ] }
 | 
			
		||||
    } case ;
 | 
			
		||||
 | 
			
		||||
: make-event ( task -- event )
 | 
			
		||||
    "epoll-event" <c-object>
 | 
			
		||||
    tuck set-epoll-event-events
 | 
			
		||||
    over io-task-fd over set-epoll-fd ;
 | 
			
		||||
 | 
			
		||||
: do-epoll-ctl ( task mx what -- )
 | 
			
		||||
    >r >r make-event r> mx-fd r> pick event-data *int roll
 | 
			
		||||
    epoll_ctl io-error ;
 | 
			
		||||
 | 
			
		||||
M: epoll-mx register-io-task ( task mx -- )
 | 
			
		||||
    EPOLL_CTL_ADD do-epoll-ctl ;
 | 
			
		||||
 | 
			
		||||
M: epoll-mx unregister-io-task ( task mx -- )
 | 
			
		||||
    EPOLL_CTL_DEL do-epoll-ctl ;
 | 
			
		||||
 | 
			
		||||
: wait-kevent ( mx timeout -- n )
 | 
			
		||||
    >r mx-fd epoll-mx-events max-events r> epoll_wait
 | 
			
		||||
    dup multiplexer-error ;
 | 
			
		||||
 | 
			
		||||
: epoll-read-task ( mx fd -- )
 | 
			
		||||
    over mx-reads at* [ handle-io-task ] [ 2drop ] if ;
 | 
			
		||||
 | 
			
		||||
: epoll-write-task ( mx fd -- )
 | 
			
		||||
    over mx-reads at* [ handle-io-task ] [ 2drop ] if ;
 | 
			
		||||
 | 
			
		||||
: handle-event ( mx kevent -- )
 | 
			
		||||
    epoll-event-fd 2dup epoll-read-task epoll-write-task ;
 | 
			
		||||
 | 
			
		||||
: handle-events ( mx n -- )
 | 
			
		||||
    [ over epoll-mx-events kevent-nth handle-kevent ] with each ;
 | 
			
		||||
 | 
			
		||||
M: epoll-mx unix-io-multiplex ( ms mx -- )
 | 
			
		||||
    dup rot wait-kevent handle-kevents ;
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,90 @@
 | 
			
		|||
! Copyright (C) 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: alien.c-types kernel io.nonblocking io.unix.backend
 | 
			
		||||
io.unix.sockets sequences assocs unix unix.kqueue unix.process
 | 
			
		||||
math namespaces classes combinators threads vectors ;
 | 
			
		||||
IN: io.unix.kqueue
 | 
			
		||||
 | 
			
		||||
TUPLE: kqueue-mx events processes ;
 | 
			
		||||
 | 
			
		||||
: max-events ( -- n )
 | 
			
		||||
    #! We read up to 256 events at a time. This is an arbitrary
 | 
			
		||||
    #! constant...
 | 
			
		||||
    256 ; inline
 | 
			
		||||
 | 
			
		||||
: <kqueue-mx> ( -- mx )
 | 
			
		||||
    kqueue-mx construct-mx
 | 
			
		||||
    kqueue dup io-error over set-mx-fd
 | 
			
		||||
    H{ } clone over set-kqueue-mx-processes
 | 
			
		||||
    max-events "kevent" <c-array> over set-kqueue-mx-events ;
 | 
			
		||||
 | 
			
		||||
: io-task-filter ( task -- n )
 | 
			
		||||
    class {
 | 
			
		||||
        { read-task    [ EVFILT_READ  ] }
 | 
			
		||||
        { accept-task  [ EVFILT_READ  ] }
 | 
			
		||||
        { receive-task [ EVFILT_READ  ] }
 | 
			
		||||
        { write-task   [ EVFILT_WRITE ] }
 | 
			
		||||
        { connect-task [ EVFILT_WRITE ] }
 | 
			
		||||
        { send-task    [ EVFILT_WRITE ] }
 | 
			
		||||
    } case ;
 | 
			
		||||
 | 
			
		||||
: make-kevent ( task flags -- event )
 | 
			
		||||
    "kevent" <c-object>
 | 
			
		||||
    tuck set-kevent-flags
 | 
			
		||||
    over io-task-fd over set-kevent-ident
 | 
			
		||||
    swap io-task-filter over set-kevent-filter ;
 | 
			
		||||
 | 
			
		||||
: register-kevent ( kevent mx -- )
 | 
			
		||||
    mx-fd swap 1 f 0 f kevent io-error ;
 | 
			
		||||
 | 
			
		||||
M: kqueue-mx register-io-task ( task mx -- )
 | 
			
		||||
    over EV_ADD make-kevent over register-kevent
 | 
			
		||||
    delegate register-io-task ;
 | 
			
		||||
 | 
			
		||||
M: kqueue-mx unregister-io-task ( task mx -- )
 | 
			
		||||
    2dup delegate unregister-io-task
 | 
			
		||||
    swap EV_DELETE make-kevent swap register-kevent ;
 | 
			
		||||
 | 
			
		||||
: wait-kevent ( mx timespec -- n )
 | 
			
		||||
    >r dup mx-fd f 0 roll kqueue-mx-events max-events r> kevent
 | 
			
		||||
    dup multiplexer-error ;
 | 
			
		||||
 | 
			
		||||
: kevent-read-task ( mx fd -- )
 | 
			
		||||
    over mx-reads at handle-io-task ;
 | 
			
		||||
 | 
			
		||||
: kevent-write-task ( mx fd -- )
 | 
			
		||||
    over mx-reads at handle-io-task ;
 | 
			
		||||
 | 
			
		||||
: kevent-proc-task ( mx pid -- )
 | 
			
		||||
    dup (wait-for-pid) spin kqueue-mx-processes delete-at* [
 | 
			
		||||
        [ schedule-thread-with ] with each
 | 
			
		||||
    ] [ 2drop ] if ;
 | 
			
		||||
 | 
			
		||||
: handle-kevent ( mx kevent -- )
 | 
			
		||||
    dup kevent-ident swap kevent-filter {
 | 
			
		||||
        { [ dup EVFILT_READ = ] [ drop kevent-read-task ] }
 | 
			
		||||
        { [ dup EVFILT_WRITE = ] [ drop kevent-write-task ] }
 | 
			
		||||
        { [ dup EVFILT_PROC = ] [ drop kevent-proc-task ] }
 | 
			
		||||
    } cond ;
 | 
			
		||||
 | 
			
		||||
: handle-kevents ( mx n -- )
 | 
			
		||||
    [ over kqueue-mx-events kevent-nth handle-kevent ] with each ;
 | 
			
		||||
 | 
			
		||||
M: kqueue-mx unix-io-multiplex ( ms mx -- )
 | 
			
		||||
    swap make-timespec dupd wait-kevent handle-kevents ;
 | 
			
		||||
 | 
			
		||||
: make-proc-kevent ( pid -- kevent )
 | 
			
		||||
    "kevent" <c-object>
 | 
			
		||||
    tuck set-kevent-ident
 | 
			
		||||
    EV_ADD over set-kevent-flags
 | 
			
		||||
    EVFILT_PROC over set-kevent-filter
 | 
			
		||||
    NOTE_EXIT over set-kevent-fflags ;
 | 
			
		||||
 | 
			
		||||
: add-pid-task ( continuation pid mx -- )
 | 
			
		||||
    2dup kqueue-mx-processes at* [
 | 
			
		||||
        2nip push
 | 
			
		||||
    ] [
 | 
			
		||||
        drop
 | 
			
		||||
        over make-proc-kevent over register-kevent
 | 
			
		||||
        >r >r 1vector r> r> kqueue-mx-processes set-at
 | 
			
		||||
    ] if ;
 | 
			
		||||
| 
						 | 
				
			
			@ -1,14 +1,18 @@
 | 
			
		|||
! Copyright (C) 2007 Slava Pestov.
 | 
			
		||||
! Copyright (C) 2007, 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: io io.launcher io.unix.backend io.nonblocking
 | 
			
		||||
sequences kernel namespaces math system alien.c-types
 | 
			
		||||
debugger continuations arrays assocs combinators unix.process
 | 
			
		||||
USING: io io.backend io.launcher io.unix.backend io.nonblocking
 | 
			
		||||
sequences kernel namespaces math system alien.c-types debugger
 | 
			
		||||
continuations arrays assocs combinators unix.process
 | 
			
		||||
parser-combinators memoize promises strings ;
 | 
			
		||||
IN: io.unix.launcher
 | 
			
		||||
 | 
			
		||||
! Search unix first
 | 
			
		||||
USE: unix
 | 
			
		||||
 | 
			
		||||
HOOK: wait-for-process io-backend ( pid -- status )
 | 
			
		||||
 | 
			
		||||
M: unix-io wait-for-process ( pid -- status ) wait-for-pid ;
 | 
			
		||||
 | 
			
		||||
! Our command line parser. Supported syntax:
 | 
			
		||||
! foo bar baz -- simple tokens
 | 
			
		||||
! foo\ bar -- escaping the space
 | 
			
		||||
| 
						 | 
				
			
			@ -44,28 +48,26 @@ MEMO: 'arguments' ( -- parser )
 | 
			
		|||
 | 
			
		||||
: (spawn-process) ( -- )
 | 
			
		||||
    [
 | 
			
		||||
        pass-environment? [
 | 
			
		||||
	    get-arguments get-environment assoc>env exec-args-with-env
 | 
			
		||||
        ] [
 | 
			
		||||
	    get-arguments exec-args-with-path
 | 
			
		||||
        ] if io-error
 | 
			
		||||
        get-arguments
 | 
			
		||||
        pass-environment?
 | 
			
		||||
        [ get-environment assoc>env exec-args-with-env ]
 | 
			
		||||
        [ exec-args-with-path ] if
 | 
			
		||||
        io-error
 | 
			
		||||
    ] [ error. :c flush ] recover 1 exit ;
 | 
			
		||||
 | 
			
		||||
: wait-for-process ( pid -- )
 | 
			
		||||
    0 <int> 0 waitpid drop ;
 | 
			
		||||
 | 
			
		||||
: spawn-process ( -- pid )
 | 
			
		||||
    [ (spawn-process) ] [ ] with-fork ;
 | 
			
		||||
 | 
			
		||||
: spawn-detached ( -- )
 | 
			
		||||
    [ spawn-process 0 exit ] [ ] with-fork wait-for-process ;
 | 
			
		||||
    [ spawn-process 0 exit ] [ ] with-fork
 | 
			
		||||
    wait-for-process drop ;
 | 
			
		||||
 | 
			
		||||
M: unix-io run-process* ( desc -- )
 | 
			
		||||
    [
 | 
			
		||||
        +detached+ get [
 | 
			
		||||
            spawn-detached
 | 
			
		||||
        ] [
 | 
			
		||||
            spawn-process wait-for-process
 | 
			
		||||
            spawn-process wait-for-process drop
 | 
			
		||||
        ] if
 | 
			
		||||
    ] with-descriptor ;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -85,15 +87,16 @@ M: unix-io run-process* ( desc -- )
 | 
			
		|||
        -rot 2dup second close first close
 | 
			
		||||
    ] with-fork first swap second rot ;
 | 
			
		||||
 | 
			
		||||
TUPLE: pipe-stream pid ;
 | 
			
		||||
TUPLE: pipe-stream pid status ;
 | 
			
		||||
 | 
			
		||||
: <pipe-stream> ( in out pid -- stream )
 | 
			
		||||
    pipe-stream construct-boa
 | 
			
		||||
    f pipe-stream construct-boa
 | 
			
		||||
    -rot handle>duplex-stream over set-delegate ;
 | 
			
		||||
 | 
			
		||||
M: pipe-stream stream-close
 | 
			
		||||
    dup delegate stream-close
 | 
			
		||||
    pipe-stream-pid wait-for-process ;
 | 
			
		||||
    dup pipe-stream-pid wait-for-process
 | 
			
		||||
    swap set-pipe-stream-status ;
 | 
			
		||||
 | 
			
		||||
M: unix-io process-stream*
 | 
			
		||||
    [ spawn-process-stream <pipe-stream> ] with-descriptor ;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
! Copyright (C) 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
IN: io.unix.linux
 | 
			
		||||
USING: io.unix.backend io.unix.select namespaces kernel assocs ;
 | 
			
		||||
 | 
			
		||||
TUPLE: linux-io ;
 | 
			
		||||
 | 
			
		||||
INSTANCE: linux-io unix-io
 | 
			
		||||
 | 
			
		||||
M: linux-io init-io ( -- )
 | 
			
		||||
    start-wait-loop
 | 
			
		||||
    <epoll-mx> mx set-global ;
 | 
			
		||||
 | 
			
		||||
M: linux-io wait-for-pid ( pid -- status )
 | 
			
		||||
    [ kqueue-mx get-global add-pid-task stop ] curry callcc1 ;
 | 
			
		||||
 | 
			
		||||
T{ linux-io } io-backend set-global
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,47 @@
 | 
			
		|||
! Copyright (C) 2004, 2008 Slava Pestov.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: alien.c-types kernel io.nonblocking io.unix.backend
 | 
			
		||||
bit-arrays sequences assocs unix math namespaces structs ;
 | 
			
		||||
IN: io.unix.select
 | 
			
		||||
 | 
			
		||||
TUPLE: select-mx read-fdset write-fdset ;
 | 
			
		||||
 | 
			
		||||
! Factor's bit-arrays are an array of bytes, OS X expects
 | 
			
		||||
! FD_SET to be an array of cells, so we have to account for
 | 
			
		||||
! byte order differences on big endian platforms
 | 
			
		||||
: little-endian? 1 <int> *char 1 = ; foldable
 | 
			
		||||
 | 
			
		||||
: munge ( i -- i' )
 | 
			
		||||
    little-endian? [ BIN: 11000 bitxor ] unless ; inline
 | 
			
		||||
 | 
			
		||||
: <select-mx> ( -- mx )
 | 
			
		||||
    select-mx construct-mx
 | 
			
		||||
    FD_SETSIZE 8 * <bit-array> over set-select-mx-read-fdset
 | 
			
		||||
    FD_SETSIZE 8 * <bit-array> over set-select-mx-write-fdset ;
 | 
			
		||||
 | 
			
		||||
: handle-fd ( fd task fdset mx -- )
 | 
			
		||||
    roll munge rot nth [ swap handle-io-task ] [ 2drop ] if ;
 | 
			
		||||
 | 
			
		||||
: handle-fdset ( tasks fdset mx -- )
 | 
			
		||||
    [ handle-fd ] 2curry assoc-each ;
 | 
			
		||||
 | 
			
		||||
: init-fdset ( tasks fdset -- )
 | 
			
		||||
    dup clear-bits
 | 
			
		||||
    [ >r drop t swap munge r> set-nth ] curry assoc-each ;
 | 
			
		||||
 | 
			
		||||
: read-fdset/tasks
 | 
			
		||||
    { mx-reads select-mx-read-fdset } get-slots ;
 | 
			
		||||
 | 
			
		||||
: write-fdset/tasks
 | 
			
		||||
    { mx-writes select-mx-write-fdset } get-slots ;
 | 
			
		||||
 | 
			
		||||
: init-fdsets ( mx -- read write except )
 | 
			
		||||
    [ read-fdset/tasks tuck init-fdset ] keep
 | 
			
		||||
    write-fdset/tasks tuck init-fdset
 | 
			
		||||
    f ;
 | 
			
		||||
 | 
			
		||||
M: select-mx unix-io-multiplex ( ms mx -- )
 | 
			
		||||
    swap >r FD_SETSIZE over init-fdsets r> make-timeval
 | 
			
		||||
    select multiplexer-error
 | 
			
		||||
    dup read-fdset/tasks pick handle-fdset
 | 
			
		||||
    dup write-fdset/tasks rot handle-fdset ;
 | 
			
		||||
| 
						 | 
				
			
			@ -40,7 +40,7 @@ M: connect-task do-io-task
 | 
			
		|||
    io-task-port dup port-handle f 0 write
 | 
			
		||||
    0 < [ defer-error ] [ drop t ] if ;
 | 
			
		||||
 | 
			
		||||
M: connect-task task-container drop write-tasks get-global ;
 | 
			
		||||
M: connect-task io-task-container drop mx-writes ;
 | 
			
		||||
 | 
			
		||||
: wait-to-connect ( port -- )
 | 
			
		||||
    [ <connect-task> add-io-task stop ] callcc0 drop ;
 | 
			
		||||
| 
						 | 
				
			
			@ -70,7 +70,7 @@ TUPLE: accept-task ;
 | 
			
		|||
: <accept-task> ( port continuation  -- task )
 | 
			
		||||
    accept-task <io-task> ;
 | 
			
		||||
 | 
			
		||||
M: accept-task task-container drop read-tasks get ;
 | 
			
		||||
M: accept-task io-task-container drop mx-reads ;
 | 
			
		||||
 | 
			
		||||
: accept-sockaddr ( port -- fd sockaddr )
 | 
			
		||||
    dup port-handle swap server-port-addr sockaddr-type
 | 
			
		||||
| 
						 | 
				
			
			@ -152,7 +152,7 @@ M: receive-task do-io-task
 | 
			
		|||
        2drop defer-error
 | 
			
		||||
    ] if ;
 | 
			
		||||
 | 
			
		||||
M: receive-task task-container drop read-tasks get ;
 | 
			
		||||
M: receive-task io-task-container drop mx-reads ;
 | 
			
		||||
 | 
			
		||||
: wait-receive ( stream -- )
 | 
			
		||||
    [ <receive-task> add-io-task stop ] callcc0 drop ;
 | 
			
		||||
| 
						 | 
				
			
			@ -185,7 +185,7 @@ M: send-task do-io-task
 | 
			
		|||
    [ send-task-len do-send ] keep
 | 
			
		||||
    swap 0 < [ io-task-port defer-error ] [ drop t ] if ;
 | 
			
		||||
 | 
			
		||||
M: send-task task-container drop write-tasks get ;
 | 
			
		||||
M: send-task io-task-container drop mx-writes ;
 | 
			
		||||
 | 
			
		||||
: wait-send ( packet sockaddr len stream -- )
 | 
			
		||||
    [ <send-task> add-io-task stop ] callcc0 2drop 2drop ;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,9 +3,8 @@ io.unix.launcher io.unix.mmap io.backend combinators namespaces
 | 
			
		|||
system vocabs.loader ;
 | 
			
		||||
 | 
			
		||||
{
 | 
			
		||||
    { [ macosx? ] [ "io.unix.backend.kqueue" ] }
 | 
			
		||||
    { [ bsd? ] [ "io.unix.backend.kqueue" ] }
 | 
			
		||||
    { [ unix? ] [ "io.unix.backend.select" ] }
 | 
			
		||||
    { [ bsd? ] [ "io.unix.bsd" ] }
 | 
			
		||||
    { [ macosx? ] [ "io.unix.bsd" ] }
 | 
			
		||||
    { [ linux? ] [ "io.unix.backend.linux" ] }
 | 
			
		||||
    { [ solaris? ] [ "io.unix.backend.solaris" ] }
 | 
			
		||||
} cond require
 | 
			
		||||
 | 
			
		||||
T{ unix-io } io-backend set-global
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,53 +1,55 @@
 | 
			
		|||
 | 
			
		||||
USING: kernel alien.c-types sequences math unix combinators.cleave ;
 | 
			
		||||
USING: kernel alien.c-types sequences math unix
 | 
			
		||||
combinators.cleave vectors kernel namespaces continuations
 | 
			
		||||
threads assocs vectors ;
 | 
			
		||||
 | 
			
		||||
IN: unix.process
 | 
			
		||||
 | 
			
		||||
! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | 
			
		||||
! Low-level Unix process launching utilities. These are used
 | 
			
		||||
! to implement io.launcher on Unix. User code should use
 | 
			
		||||
! io.launcher instead.
 | 
			
		||||
 | 
			
		||||
: >argv ( seq -- alien ) [ malloc-char-string ] map f add >c-void*-array ;
 | 
			
		||||
 | 
			
		||||
! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | 
			
		||||
 | 
			
		||||
: exec ( pathname argv -- int )
 | 
			
		||||
  [ malloc-char-string ] [ >argv ] bi* execv ;
 | 
			
		||||
    [ malloc-char-string ] [ >argv ] bi* execv ;
 | 
			
		||||
 | 
			
		||||
: exec-with-path ( filename argv -- int )
 | 
			
		||||
  [ malloc-char-string ] [ >argv ] bi* execvp ;
 | 
			
		||||
    [ malloc-char-string ] [ >argv ] bi* execvp ;
 | 
			
		||||
 | 
			
		||||
: exec-with-env ( filename argv envp -- int )
 | 
			
		||||
  [ malloc-char-string ] [ >argv ] [ >argv ] tri* execve ;
 | 
			
		||||
    [ malloc-char-string ] [ >argv ] [ >argv ] tri* execve ;
 | 
			
		||||
 | 
			
		||||
! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | 
			
		||||
: exec-args ( seq -- int )
 | 
			
		||||
    [ first ] [ ] bi exec ;
 | 
			
		||||
 | 
			
		||||
: exec-args           ( seq -- int ) [ first ] [ ] bi exec ;
 | 
			
		||||
: exec-args-with-path ( seq -- int ) [ first ] [ ] bi exec-with-path ;
 | 
			
		||||
: exec-args-with-path ( seq -- int )
 | 
			
		||||
    [ first ] [ ] bi exec-with-path ;
 | 
			
		||||
 | 
			
		||||
: exec-args-with-env  ( seq seq -- int ) >r [ first ] [ ] bi r> exec-with-env ;
 | 
			
		||||
: exec-args-with-env  ( seq seq -- int )
 | 
			
		||||
    >r [ first ] [ ] bi r> exec-with-env ;
 | 
			
		||||
 | 
			
		||||
! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | 
			
		||||
: with-fork ( child parent -- )
 | 
			
		||||
    fork dup zero? -roll swap curry if ; inline
 | 
			
		||||
 | 
			
		||||
: with-fork ( child parent -- ) fork dup zero? -roll swap curry if ; inline
 | 
			
		||||
 | 
			
		||||
! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | 
			
		||||
 | 
			
		||||
USING: kernel alien.c-types namespaces continuations threads assocs unix
 | 
			
		||||
       combinators.cleave ;
 | 
			
		||||
! Lame polling strategy for getting process exit codes. On
 | 
			
		||||
! BSD, we use kqueue which is more efficient.
 | 
			
		||||
 | 
			
		||||
SYMBOL: pid-wait
 | 
			
		||||
 | 
			
		||||
! KEY | VALUE
 | 
			
		||||
! -----------
 | 
			
		||||
! pid | continuation
 | 
			
		||||
: (wait-for-pid) ( pid -- status )
 | 
			
		||||
    0 <int> [ 0 waitpid drop ] keep *int ;
 | 
			
		||||
 | 
			
		||||
: init-pid-wait ( -- ) H{ } clone pid-wait set-global ;
 | 
			
		||||
 | 
			
		||||
: wait-for-pid ( pid -- status ) [ pid-wait get set-at stop ] curry callcc1 ;
 | 
			
		||||
: wait-for-pid ( pid -- status )
 | 
			
		||||
    [ pid-wait get-global [ ?push ] change-at stop ] curry
 | 
			
		||||
    callcc1 ;
 | 
			
		||||
 | 
			
		||||
: wait-loop ( -- )
 | 
			
		||||
  -1 0 <int> tuck WNOHANG waitpid               ! &status return
 | 
			
		||||
  [ *int ] [ pid-wait get delete-at* drop ] bi* ! status ?
 | 
			
		||||
  dup [ schedule-thread-with ] [ 2drop ] if
 | 
			
		||||
  250 sleep wait-loop ;
 | 
			
		||||
    -1 0 <int> tuck WNOHANG waitpid               ! &status return
 | 
			
		||||
    [ *int ] [ pid-wait get delete-at* drop ] bi* ! status ?
 | 
			
		||||
    [ schedule-thread-with ] with each
 | 
			
		||||
    250 sleep
 | 
			
		||||
    wait-loop ;
 | 
			
		||||
 | 
			
		||||
: start-wait-loop ( -- ) init-pid-wait [ wait-loop ] in-thread ;
 | 
			
		||||
: start-wait-loop ( -- )
 | 
			
		||||
    H{ } clone pid-wait set-global
 | 
			
		||||
    [ wait-loop ] in-thread ;
 | 
			
		||||
		Loading…
	
		Reference in New Issue