Clean up non-blocking wait-for-process support, implement on Unix (untested)

db4
Slava Pestov 2008-01-24 04:19:15 -04:00
parent d621b9852e
commit 6afa4119c8
8 changed files with 72 additions and 97 deletions

View File

@ -1,12 +1,25 @@
! Copyright (C) 2008 Slava Pestov. ! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: io io.backend system kernel namespaces strings hashtables USING: io io.backend system kernel namespaces strings hashtables
sequences assocs combinators vocabs.loader ; sequences assocs combinators vocabs.loader init threads
continuations ;
IN: io.launcher IN: io.launcher
! Non-blocking process exit notification facility
SYMBOL: processes
[ H{ } clone processes set-global ] "io.launcher" add-init-hook
TUPLE: process handle status ; TUPLE: process handle status ;
: <process> ( handle -- process ) f process construct-boa ; HOOK: register-process io-backend ( process -- )
M: object register-process drop ;
: <process> ( handle -- process )
f process construct-boa
V{ } clone over processes get set-at
dup register-process ;
M: process equal? 2drop f ; M: process equal? 2drop f ;
@ -54,11 +67,10 @@ M: assoc >descriptor ;
HOOK: run-process* io-backend ( desc -- handle ) HOOK: run-process* io-backend ( desc -- handle )
HOOK: wait-for-process* io-backend ( process -- )
: wait-for-process ( process -- status ) : wait-for-process ( process -- status )
dup process-handle [ dup wait-for-process* ] when dup process-handle [
process-status ; dup [ processes get at push stop ] curry callcc0
] when process-status ;
: run-process ( obj -- process ) : run-process ( obj -- process )
>descriptor >descriptor
@ -81,3 +93,8 @@ TUPLE: process-stream process ;
swap <process-stream> swap <process-stream>
[ swap with-stream ] keep [ swap with-stream ] keep
process-stream-process ; inline process-stream-process ; inline
: notify-exit ( status process -- )
[ set-process-status ] keep
[ processes get delete-at* drop [ schedule-thread ] each ] keep
f swap set-process-handle ;

View File

@ -23,7 +23,7 @@ M: bsd-io init-io ( -- )
2dup mx get-global mx-reads set-at 2dup mx get-global mx-reads set-at
mx get-global mx-writes set-at ; mx get-global mx-writes set-at ;
M: bsd-io wait-for-process ( pid -- status ) M: bsd-io register-process ( process -- )
[ kqueue-mx get-global add-pid-task stop ] curry callcc1 ; process-handle kqueue-mx get-global add-pid-task ;
T{ bsd-io } set-io-backend T{ bsd-io } set-io-backend

18
extra/io/unix/kqueue/kqueue.factor Normal file → Executable file
View File

@ -5,7 +5,7 @@ sequences assocs unix unix.kqueue unix.process math namespaces
combinators threads vectors ; combinators threads vectors ;
IN: io.unix.kqueue IN: io.unix.kqueue
TUPLE: kqueue-mx events processes ; TUPLE: kqueue-mx events ;
: max-events ( -- n ) : max-events ( -- n )
#! We read up to 256 events at a time. This is an arbitrary #! We read up to 256 events at a time. This is an arbitrary
@ -15,7 +15,6 @@ TUPLE: kqueue-mx events processes ;
: <kqueue-mx> ( -- mx ) : <kqueue-mx> ( -- mx )
kqueue-mx construct-mx kqueue-mx construct-mx
kqueue dup io-error over set-mx-fd kqueue dup io-error over set-mx-fd
H{ } clone over set-kqueue-mx-processes
max-events "kevent" <c-array> over set-kqueue-mx-events ; max-events "kevent" <c-array> over set-kqueue-mx-events ;
GENERIC: io-task-filter ( task -- n ) GENERIC: io-task-filter ( task -- n )
@ -52,9 +51,8 @@ M: kqueue-mx unregister-io-task ( task mx -- )
over mx-reads at handle-io-task ; over mx-reads at handle-io-task ;
: kevent-proc-task ( mx pid -- ) : kevent-proc-task ( mx pid -- )
dup (wait-for-pid) spin kqueue-mx-processes delete-at* [ dup (wait-for-pid) swap find-process
[ schedule-thread-with ] with each dup [ notify-exit ] [ 2drop ] if ;
] [ 2drop ] if ;
: handle-kevent ( mx kevent -- ) : handle-kevent ( mx kevent -- )
dup kevent-ident swap kevent-filter { dup kevent-ident swap kevent-filter {
@ -76,11 +74,5 @@ M: kqueue-mx wait-for-events ( ms mx -- )
EVFILT_PROC over set-kevent-filter EVFILT_PROC over set-kevent-filter
NOTE_EXIT over set-kevent-fflags ; NOTE_EXIT over set-kevent-fflags ;
: add-pid-task ( continuation pid mx -- ) : add-pid-task ( pid mx -- )
2dup kqueue-mx-processes at* [ swap make-proc-kevent swap register-kevent ;
2nip push
] [
drop
over make-proc-kevent over register-kevent
>r >r 1vector r> r> kqueue-mx-processes set-at
] if ;

View File

@ -9,10 +9,6 @@ IN: io.unix.launcher
! Search unix first ! Search unix first
USE: unix USE: unix
HOOK: wait-for-process io-backend ( pid -- status )
M: unix-io wait-for-process ( pid -- status ) wait-for-pid ;
! Our command line parser. Supported syntax: ! Our command line parser. Supported syntax:
! foo bar baz -- simple tokens ! foo bar baz -- simple tokens
! foo\ bar -- escaping the space ! foo\ bar -- escaping the space
@ -46,7 +42,7 @@ MEMO: 'arguments' ( -- parser )
: assoc>env ( assoc -- env ) : assoc>env ( assoc -- env )
[ "=" swap 3append ] { } assoc>map ; [ "=" swap 3append ] { } assoc>map ;
: (spawn-process) ( -- ) : spawn-process ( -- )
[ [
get-arguments get-arguments
pass-environment? pass-environment?
@ -55,20 +51,9 @@ MEMO: 'arguments' ( -- parser )
io-error io-error
] [ error. :c flush ] recover 1 exit ; ] [ error. :c flush ] recover 1 exit ;
: spawn-process ( -- pid ) M: unix-io run-process* ( desc -- pid )
[ (spawn-process) ] [ ] with-fork ;
: spawn-detached ( -- )
[ spawn-process 0 exit ] [ ] with-fork
wait-for-process drop ;
M: unix-io run-process* ( desc -- )
[ [
+detached+ get [ [ spawn-process ] [ ] with-fork <process>
spawn-detached
] [
spawn-process wait-for-process drop
] if
] with-descriptor ; ] with-descriptor ;
: open-pipe ( -- pair ) : open-pipe ( -- pair )
@ -82,21 +67,35 @@ M: unix-io run-process* ( desc -- )
: spawn-process-stream ( -- in out pid ) : spawn-process-stream ( -- in out pid )
open-pipe open-pipe [ open-pipe open-pipe [
setup-stdio-pipe setup-stdio-pipe
(spawn-process) spawn-process
] [ ] [
-rot 2dup second close first close -rot 2dup second close first close
] with-fork first swap second rot ; ] with-fork first swap second rot <process> ;
TUPLE: pipe-stream pid status ;
: <pipe-stream> ( in out pid -- stream )
f pipe-stream construct-boa
-rot handle>duplex-stream over set-delegate ;
M: pipe-stream stream-close
dup delegate stream-close
dup pipe-stream-pid wait-for-process
swap set-pipe-stream-status ;
M: unix-io process-stream* M: unix-io process-stream*
[ spawn-process-stream <pipe-stream> ] with-descriptor ; [
spawn-process-stream >r handle>duplex-stream r>
] with-descriptor ;
: find-process ( handle -- process )
f process construct-boa processes get at ;
! Inefficient process wait polling, used on Linux and Solaris.
! On BSD and Mac OS X, we use kqueue() which scales better.
: wait-for-processes ( -- ? )
-1 0 <int> tuck WNOHANG waitpid
dup zero? [
2drop t
] [
find-process dup [
>r *uint r> notify-exit f
] [
2drop f
] if
] if ;
: wait-loop ( -- )
wait-for-processes [ 250 sleep ] when wait-loop ;
: start-wait-thread ( -- )
[ wait-loop ] in-thread ;

View File

@ -10,9 +10,6 @@ INSTANCE: linux-io unix-io
M: linux-io init-io ( -- ) M: linux-io init-io ( -- )
<select-mx> mx set-global <select-mx> mx set-global
start-wait-loop ; start-wait-thread ;
M: linux-io wait-for-process ( pid -- status )
wait-for-pid ;
T{ linux-io } set-io-backend T{ linux-io } set-io-backend

View File

@ -6,14 +6,6 @@ math windows.kernel32 windows namespaces io.launcher kernel
sequences windows.errors assocs splitting system threads init ; sequences windows.errors assocs splitting system threads init ;
IN: io.windows.launcher IN: io.windows.launcher
SYMBOL: processes
[ H{ } clone processes set-global ]
"io.windows.launcher" add-init-hook
: <win32-process> ( handle -- process )
<process> V{ } clone over processes get set-at ;
TUPLE: CreateProcess-args TUPLE: CreateProcess-args
lpApplicationName lpApplicationName
lpCommandLine lpCommandLine
@ -104,12 +96,9 @@ M: windows-io run-process* ( desc -- handle )
[ [
make-CreateProcess-args make-CreateProcess-args
dup call-CreateProcess dup call-CreateProcess
CreateProcess-args-lpProcessInformation <win32-process> CreateProcess-args-lpProcessInformation <process>
] with-descriptor ; ] with-descriptor ;
M: windows-io wait-for-process*
[ processes get at push stop ] curry callcc0 ;
: dispose-process ( process-information -- ) : dispose-process ( process-information -- )
#! From MSDN: "Handles in PROCESS_INFORMATION must be closed #! From MSDN: "Handles in PROCESS_INFORMATION must be closed
#! with CloseHandle when they are no longer needed." #! with CloseHandle when they are no longer needed."
@ -121,11 +110,10 @@ M: windows-io wait-for-process*
0 <ulong> [ GetExitCodeProcess ] keep *ulong 0 <ulong> [ GetExitCodeProcess ] keep *ulong
swap win32-error=0/f ; swap win32-error=0/f ;
: notify-exit ( process -- ) : process-exited ( process -- )
dup process-handle exit-code over set-process-status dup process-handle exit-code
dup process-handle dispose-process over process-handle dispose-process
dup processes get delete-at* drop [ schedule-thread ] each swap notify-exit ;
f swap set-process-handle ;
: wait-for-processes ( processes -- ? ) : wait-for-processes ( processes -- ? )
keys dup keys dup
@ -133,7 +121,7 @@ M: windows-io wait-for-process*
dup length swap >c-void*-array 0 0 dup length swap >c-void*-array 0 0
WaitForMultipleObjects WaitForMultipleObjects
dup HEX: ffffffff = [ win32-error ] when dup HEX: ffffffff = [ win32-error ] when
dup WAIT_TIMEOUT = [ 2drop t ] [ swap nth notify-exit f ] if ; dup WAIT_TIMEOUT = [ 2drop t ] [ swap nth process-exited f ] if ;
: wait-loop ( -- ) : wait-loop ( -- )
processes get dup assoc-empty? processes get dup assoc-empty?
@ -143,3 +131,5 @@ M: windows-io wait-for-process*
: start-wait-thread ( -- ) : start-wait-thread ( -- )
[ wait-loop ] in-thread ; [ wait-loop ] in-thread ;
[ start-wait-thread ] "io.windows.launcher" add-init-hook

View File

@ -59,6 +59,6 @@ M: windows-io process-stream*
dup CreateProcess-args-stdout-pipe pipe-in dup CreateProcess-args-stdout-pipe pipe-in
over CreateProcess-args-stdin-pipe pipe-out <win32-duplex-stream> over CreateProcess-args-stdin-pipe pipe-out <win32-duplex-stream>
swap CreateProcess-args-lpProcessInformation <win32-process> swap CreateProcess-args-lpProcessInformation <process>
] with-destructors ] with-destructors
] with-descriptor ; ] with-descriptor ;

22
extra/unix/process/process.factor Normal file → Executable file
View File

@ -31,25 +31,5 @@ IN: unix.process
: with-fork ( child parent -- ) : with-fork ( child parent -- )
fork dup zero? -roll swap curry if ; inline fork dup zero? -roll swap curry if ; inline
! Lame polling strategy for getting process exit codes. On
! BSD, we use kqueue which is more efficient.
SYMBOL: pid-wait
: (wait-for-pid) ( pid -- status )
0 <int> [ 0 waitpid drop ] keep *int ;
: wait-for-pid ( pid -- status ) : wait-for-pid ( pid -- status )
[ pid-wait get-global [ ?push ] change-at stop ] curry 0 <int> [ 0 waitpid drop ] keep *int ;
callcc1 ;
: wait-loop ( -- )
-1 0 <int> tuck WNOHANG waitpid ! &status return
[ *int ] [ pid-wait get delete-at* drop ] bi* ! status ?
[ schedule-thread-with ] with each
250 sleep
wait-loop ;
: start-wait-loop ( -- )
H{ } clone pid-wait set-global
[ wait-loop ] in-thread ;