fix multiplexer bug

cvs
Slava Pestov 2005-06-19 01:15:07 +00:00
parent e74577120b
commit 4bbc5c41f4
4 changed files with 79 additions and 48 deletions

View File

@ -38,7 +38,7 @@ stdio streams strings unparser ;
: serve-directory ( filename -- ) : serve-directory ( filename -- )
"/" ?tail [ "/" ?tail [
dup "/index.html" append dup exists? [ dup "/index.html" append dup exists? [
serve-file nip serve-file
] [ ] [
drop list-directory drop list-directory
] ifte ] ifte

View File

@ -50,12 +50,10 @@ stdio streams strings threads http sequences ;
] ifte ; ] ifte ;
: httpd-client ( socket -- ) : httpd-client ( socket -- )
[
dup log-client [ dup log-client [
60000 stdio get set-timeout 60000 stdio get set-timeout
read-line [ parse-request ] when* read-line [ parse-request ] when*
] with-stream ] with-stream ;
] try ;
: httpd-connection ( socket -- ) : httpd-connection ( socket -- )
"http-server" get accept [ httpd-client ] in-thread drop ; "http-server" get accept [ httpd-client ] in-thread drop ;

View File

@ -3,7 +3,7 @@
IN: io-internals IN: io-internals
USING: alien assembler errors generic hashtables kernel USING: alien assembler errors generic hashtables kernel
kernel-internals lists math sequences streams strings threads kernel-internals lists math sequences streams strings threads
unix-internals vectors ; unix-internals unparser vectors ;
! We want namespaces::bind to shadow the bind system call from ! We want namespaces::bind to shadow the bind system call from
! unix-internals ! unix-internals
@ -13,8 +13,10 @@ USING: namespaces ;
: byte-bit ( n alien -- byte bit ) : byte-bit ( n alien -- byte bit )
over -5 shift alien-unsigned-4 swap 31 bitand ; over -5 shift alien-unsigned-4 swap 31 bitand ;
: bit-length ( n -- n ) cell / ceiling ;
: <bit-array> ( n -- array ) : <bit-array> ( n -- array )
cell / ceiling <byte-array> ; bit-length <byte-array> ;
: bit-nth ( n alien -- ? ) : bit-nth ( n alien -- ? )
byte-bit 1 swap shift bitand 0 > ; byte-bit 1 swap shift bitand 0 > ;
@ -26,6 +28,11 @@ USING: namespaces ;
[ byte-bit set-bit ] 2keep [ byte-bit set-bit ] 2keep
swap -5 shift set-alien-unsigned-4 ; swap -5 shift set-alien-unsigned-4 ;
: clear-bits ( alien len -- )
bit-length [
0 pick pick set-alien-unsigned-cell
] repeat drop ;
! Global variables ! Global variables
SYMBOL: read-fdset SYMBOL: read-fdset
SYMBOL: read-tasks SYMBOL: read-tasks
@ -73,14 +80,22 @@ M: port set-timeout ( timeout port -- )
: >port< dup port-handle swap delegate ; : >port< dup port-handle swap delegate ;
: pending-error ( reader -- ) port-error throw ; : pending-error ( port -- )
dup port-error f rot set-port-error throw ;
: EAGAIN 35 ; : EAGAIN 11 ;
: EINTR 4 ; : EINTR 4 ;
: postpone-error ( port -- ) : defer-error ( port -- ? )
err_no dup EAGAIN = over EINTR = or #! Return t if it is an unrecoverable error.
[ 2drop ] [ strerror swap set-port-error ] ifte ; err_no dup EAGAIN = over EINTR = or [
2drop f
] [
[
"Error on fd " % over port-handle unparse %
": " % strerror %
] make-string swap set-port-error t
] ifte ;
! Associates a port with a list of continuations waiting on the ! Associates a port with a list of continuations waiting on the
! port to finish I/O ! port to finish I/O
@ -123,18 +138,16 @@ GENERIC: task-container ( task -- vector )
dup io-task-port timeout? [ dup io-task-port timeout? [
2drop t 2drop t
] [ ] [
io-task-fd swap 2dup bit-nth io-task-fd swap bit-nth
>r f -rot set-bit-nth r>
] ifte ; ] ifte ;
: debug-out 14 getenv fwrite ;
: handle-fdset ( fdset tasks -- ) : handle-fdset ( fdset tasks -- )
[ [
cdr tuck handle-fd? [ handle-fd ] [ drop ] ifte cdr tuck handle-fd? [ handle-fd ] [ drop ] ifte
] hash-each-with ; ] hash-each-with ;
: init-fdset ( fdset tasks -- ) : init-fdset ( fdset tasks -- )
>r dup FD_SETSIZE clear-bits r>
[ car t swap rot set-bit-nth ] hash-each-with ; [ car t swap rot set-bit-nth ] hash-each-with ;
: init-fdsets ( -- read write except ) : init-fdsets ( -- read write except )
@ -218,14 +231,18 @@ C: reader ( handle -- reader )
] ifte t swap set-reader-ready? ; ] ifte t swap set-reader-ready? ;
: (refill) ( port -- n ) : (refill) ( port -- n )
>port< tuck dup buffer-end swap buffer-capacity read ; >port< dup buffer-end swap buffer-capacity read ;
: refill ( port -- ) : refill ( port -- ? )
#! Return f if there is a recoverable error
dup buffer-length 0 = [ dup buffer-length 0 = [
(refill) dup (refill) dup 0 >= [
dup 0 >= [ swap n>buffer ] [ drop postpone-error ] ifte swap n>buffer t
] [ ] [
drop drop defer-error
] ifte
] [
drop t
] ifte ; ] ifte ;
TUPLE: read-line-task ; TUPLE: read-line-task ;
@ -234,10 +251,14 @@ C: read-line-task ( port -- task )
[ >r <io-task> r> set-delegate ] keep ; [ >r <io-task> r> set-delegate ] keep ;
M: read-line-task do-io-task ( task -- ? ) M: read-line-task do-io-task ( task -- ? )
io-task-port dup refill dup eof? [ io-task-port dup refill [
dup eof? [
reader-eof t reader-eof t
] [ ] [
read-line-step read-line-step
] ifte
] [
drop f
] ifte ; ] ifte ;
M: read-line-task task-container drop read-tasks get ; M: read-line-task task-container drop read-tasks get ;
@ -255,7 +276,8 @@ M: reader stream-readln ( stream -- line )
#! read and the line ends with \r\n, the reader stopped #! read and the line ends with \r\n, the reader stopped
#! reading at \r and set the reader-cr flag to true. But we #! reading at \r and set the reader-cr flag to true. But we
#! must ignore the \n. #! must ignore the \n.
dup buffer-length 1 >= over reader-cr and [ dup buffer-length 1 >= [
dup reader-cr [
dup buffer-peek CHAR: \n = [ dup buffer-peek CHAR: \n = [
1 swap buffer-consume 1 swap buffer-consume
] [ ] [
@ -263,6 +285,9 @@ M: reader stream-readln ( stream -- line )
] ifte ] ifte
] [ ] [
drop drop
] ifte
] [
drop
] ifte ; ] ifte ;
! Reading character counts ! Reading character counts
@ -296,10 +321,14 @@ C: read-task ( count port -- task )
: >read-task< dup read-task-count swap io-task-port ; : >read-task< dup read-task-count swap io-task-port ;
M: read-task do-io-task ( task -- ? ) M: read-task do-io-task ( task -- ? )
>read-task< dup refill dup eof? [ >read-task< dup refill [
dup eof? [
nip reader-eof t nip reader-eof t
] [ ] [
read-step read-step
] ifte
] [
2drop f
] ifte ; ] ifte ;
M: read-task task-container drop read-tasks get ; M: read-task task-container drop read-tasks get ;
@ -323,11 +352,11 @@ TUPLE: writer ;
C: writer ( fd -- writer ) C: writer ( fd -- writer )
[ >r buffered-port r> set-delegate ] keep ; [ >r buffered-port r> set-delegate ] keep ;
: write-step ( fd buffer -- ) : write-step ( port -- )
tuck dup buffer@ swap buffer-length write dup 0 >= [ dup >port< dup buffer@ swap buffer-length write dup 0 >= [
swap buffer-consume swap buffer-consume
] [ ] [
drop postpone-error drop defer-error drop
] ifte ; ] ifte ;
: can-write? ( len writer -- ? ) : can-write? ( len writer -- ? )
@ -349,7 +378,7 @@ M: write-task do-io-task
io-task-port dup buffer-length 0 = over port-error or [ io-task-port dup buffer-length 0 = over port-error or [
0 swap buffer-reset t 0 swap buffer-reset t
] [ ] [
>port< write-step f write-step f
] ifte ; ] ifte ;
M: write-task task-container drop write-tasks get ; M: write-task task-container drop write-tasks get ;

View File

@ -4,8 +4,8 @@
! We need to fiddle with the exact search order here, since ! We need to fiddle with the exact search order here, since
! unix-internals::accept shadows streams::accept. ! unix-internals::accept shadows streams::accept.
IN: io-internals IN: io-internals
USING: errors namespaces streams threads unparser ; USING: errors namespaces streams threads unparser alien generic
USING: alien generic kernel math unix-internals ; kernel math unix-internals ;
: init-sockaddr ( port -- sockaddr ) : init-sockaddr ( port -- sockaddr )
<sockaddr-in> <sockaddr-in>
@ -76,6 +76,14 @@ C: accept-task ( port -- task )
: init-socket ( fd -- ) SOL_SOCKET SO_OOBINLINE sockopt ; : init-socket ( fd -- ) SOL_SOCKET SO_OOBINLINE sockopt ;
: inet-ntoa ( n -- str )
ntohl [
dup -24 shift HEX: ff bitand unparse % CHAR: . ,
dup -16 shift HEX: ff bitand unparse % CHAR: . ,
dup -8 shift HEX: ff bitand unparse % CHAR: . ,
HEX: ff bitand unparse %
] make-string ;
: do-accept ( port sockaddr fd -- ) : do-accept ( port sockaddr fd -- )
[ [
init-socket init-socket
@ -86,21 +94,17 @@ C: accept-task ( port -- task )
M: accept-task do-io-task ( task -- ? ) M: accept-task do-io-task ( task -- ? )
io-task-port <sockaddr-in> io-task-port <sockaddr-in>
over port-handle over "sockaddr-in" c-size <int> accept over port-handle over "sockaddr-in" c-size <int> accept
dup 0 >= [ do-accept t ] [ 2drop postpone-error f ] ifte ; dup 0 >= [
do-accept t
] [
2drop defer-error
] ifte ;
M: accept-task task-container drop read-tasks get ; M: accept-task task-container drop read-tasks get ;
: wait-to-accept ( server -- ) : wait-to-accept ( server -- )
[ swap <accept-task> add-io-task stop ] callcc0 drop ; [ swap <accept-task> add-io-task stop ] callcc0 drop ;
: inet-ntoa ( n -- str )
ntohl [
dup -24 shift HEX: ff bitand unparse % CHAR: . ,
dup -16 shift HEX: ff bitand unparse % CHAR: . ,
dup -8 shift HEX: ff bitand unparse % CHAR: . ,
HEX: ff bitand unparse %
] make-string ;
: <socket-stream> ( fd -- stream ) : <socket-stream> ( fd -- stream )
dup f <fd-stream> ; dup f <fd-stream> ;
@ -111,4 +115,4 @@ IN: streams
: accept ( server -- client ) : accept ( server -- client )
#! Wait for a client connection. #! Wait for a client connection.
dup wait-to-accept server-client ; dup wait-to-accept dup pending-error server-client ;