Unix I/O multiplexer hooked up

cvs
Slava Pestov 2005-04-14 23:37:13 +00:00
parent 3e9381d867
commit e1a6166079
7 changed files with 174 additions and 103 deletions

View File

@ -51,6 +51,7 @@ public class Tuple extends FactorParsingDefinition
String tupleName = (String)next;
reader.intern(tupleName,true);
reader.intern("<" + tupleName + ">",true);
reader.intern(tupleName + "?",true);
for(;;)
{

View File

@ -110,7 +110,7 @@ M: col-seq nth col-seq-matrix <col> ;
#! for being added or subtracted.
over matrix-rows over matrix-rows = >r
over matrix-cols over matrix-cols = r> and [
"Matrix dimensions do not match"
"Matrix dimensions do not match" throw
] unless ;
: +dimensions ( matrix -- rows cols )
@ -128,7 +128,7 @@ M: matrix v* ( m m -- m ) matrix+/- v* <matrix> ;
: *check ( matrix matrix -- matrix matrix )
over matrix-rows over matrix-cols = >r
over matrix-cols over matrix-rows = r> and [
"Matrix dimensions inappropriate for composition"
"Matrix dimensions inappropriate for composition" throw
] unless ;
: *dimensions ( m m -- rows cols )

View File

@ -87,7 +87,11 @@ BUILTIN: f 9 ; : f f swons ; parsing
#! recursive words.
CREATE drop ; parsing
: FORGET: scan-word forget ; parsing
: FORGET:
#! Followed by a word name. The word is removed from its
#! vocabulary. Note that specifying an undefined word is a
#! no-op.
scan "use" get search [ forget ] when* ; parsing
: USE:
#! Add vocabulary to search path.

View File

@ -8,26 +8,23 @@ IN: words
! no effect of compiled calls to that word.
USING: interpreter kernel lists prettyprint stdio strings test ;
: annotate ( word quot -- ) #! Quotation: ( word def -- def )
: annotate ( word quot -- | quot: word def -- def )
over >r >r dup word-def r> call r> swap (define-compound) ;
inline
: (watch) >r "==> " swap word-name cat2 \ print r> cons cons ;
: (watch) ( word def -- def )
>r "==> " swap word-name cat2 \ print \ .s r>
cons cons cons ;
: watch ( word -- )
#! Cause a message to be printed out when the word is
#! executed. To undo the effect of this, reload the
#! word with \ foo reload.
#! executed.
[ (watch) ] annotate ;
: break ( word -- )
#! Cause the word to start the code walker when executed.
[ nip [ walk ] cons ] annotate ;
: dump ( word -- )
#! Cause the word to print the stack when executed.
[ nip [ .s ] swap append ] annotate ;
: timer ( word -- )
#! Print the time taken to execute the word when it's called.
[ nip [ time ] cons ] annotate ;

View File

@ -1,8 +1,8 @@
! Copyright (C) 2005 Slava Pestov.
! Copyright (C) 2004, 2005 Slava Pestov.
! See http://factor.sf.net/license.txt for BSD license.
IN: io-internals
USING: errors generic hashtables kernel lists math namespaces
sequences strings threads vectors ;
sequences streams strings threads vectors ;
! These let us load the code into a CFactor instance using the
! old C-based I/O. They will be removed soon.
@ -10,8 +10,39 @@ FORGET: can-read-line?
FORGET: can-read-count?
FORGET: can-write?
FORGET: add-write-io-task
FORGET: blocking-read-line
FORGET: blocking-write
FORGET: wait-to-read
FORGET: wait-to-read-line
FORGET: wait-to-write
! Some general stuff
: file-mode OCT: 0600 ;
: io-error ( n -- ) 0 < [ errno strerror throw ] when ;
: init-handle ( fd -- )
F_SETFL O_NONBLOCK 1 sys-fcntl io-error ;
! Common delegate of native stream readers and writers
TUPLE: port handle buffer error ;
C: port ( handle buffer -- port )
[ >r <buffer> r> set-delegate ] keep
[ >r dup init-handle r> set-port-handle ] keep ;
: buffered-port 8192 <port> ;
: >port< dup port-handle swap delegate ;
: pending-error ( reader -- ) port-error throw ;
! Associates a port with a list of continuations waiting on the
! port to finish I/O
TUPLE: io-task port callbacks ;
C: io-task ( port -- ) [ set-io-task-port ] keep ;
! Multiplexer
GENERIC: do-io-task ( task -- ? )
GENERIC: io-task-events ( task -- events )
@ -21,29 +52,62 @@ GENERIC: io-task-events ( task -- events )
! this with the hash-size call.
SYMBOL: io-tasks
: file-mode OCT: 0600 ;
: init-io ( -- ) global [ <namespace> io-tasks set ] bind ;
: io-error ( n -- ) 0 < [ errno strerror throw ] when ;
: io-task-fd io-task-port port-handle ;
: add-io-task ( callback task -- )
[ >r unit r> set-io-task-callbacks ] keep
dup io-task-fd io-tasks get 2dup hash [
"Cannot perform multiple I/O ops on the same port" throw
] when set-hash ;
: remove-io-task ( task -- )
io-task-fd io-tasks get remove-hash ;
: pop-callback ( task -- callback )
dup io-task-callbacks uncons dup [
rot set-io-task-callbacks
] [
drop swap remove-io-task
] ifte ;
: handle-fd ( fd -- )
io-tasks get hash dup do-io-task [
pop-callback call
] [
drop
] ifte ;
: do-io-tasks ( pollfds n -- )
[
dup pick pollfd-nth dup pollfd-revents 0 = [
drop
] [
pollfd-fd handle-fd
] ifte
] repeat drop ;
: init-pollfd ( task pollfd -- )
over io-task-fd over set-pollfd-fd
swap io-task-events swap set-pollfd-events ;
: make-pollfds ( -- pollfds n )
io-tasks get dup hash-size [
swap >r <pollfd-array> 0 swap r> hash-values [
( n pollfds iotask )
pick pick pollfd-nth init-pollfd >r 1 + r>
] each nip
] keep ;
: io-multiplex ( -- )
make-pollfds 2dup 0 sys-poll drop do-io-tasks ;
! Readers
: open-read ( path -- fd )
O_RDONLY file-mode sys-open dup io-error ;
: open-write ( path -- fd )
O_WRONLY O_CREAT bitor O_TRUNC bitor file-mode sys-open
dup io-error ;
TUPLE: port handle buffer error ;
C: port ( handle buffer -- port )
[ >r <buffer> r> set-delegate ] keep
[ set-port-handle ] keep ;
: buffered-port 8192 <port> ;
: >port< dup port-handle swap delegate ;
: pending-error ( reader -- ) port-error throw ;
TUPLE: reader line ready? ;
C: reader ( handle -- reader )
@ -96,7 +160,7 @@ C: reader ( handle -- reader )
TUPLE: read-line-task ;
C: read-line-task ( port callbacks -- task )
C: read-line-task ( port -- task )
[ >r <io-task> r> set-delegate ] keep ;
M: read-line-task do-io-task
@ -127,7 +191,7 @@ M: read-line-task io-task-events ( task -- events )
TUPLE: read-task count ;
C: read-task ( port callbacks -- task )
C: read-task ( port -- task )
[ >r <io-task> r> set-delegate ] keep ;
M: read-task do-io-task
@ -152,6 +216,38 @@ M: read-task io-task-events ( task -- events )
"reader not ready" throw
] ifte ;
: wait-to-read-line ( port -- )
dup can-read-line? [
drop
] [
[
swap <read-line-task> add-io-task io-multiplex
] callcc0 drop
] ifte ;
M: reader stream-readln ( stream -- line )
dup wait-to-read-line read-fin ;
: wait-to-read ( count port -- )
2dup can-read-count? [
2drop
] [
[
swap <read-task> add-io-task io-multiplex
] callcc0 2drop
] ifte ;
M: reader stream-read ( count stream -- string )
2dup wait-to-read read-fin ;
M: reader stream-close ( stream -- ) port-handle sys-close ;
! Writers
: open-write ( path -- fd )
O_WRONLY O_CREAT bitor O_TRUNC bitor file-mode sys-open
dup io-error ;
TUPLE: writer ;
C: writer ( fd -- writer )
@ -173,7 +269,7 @@ C: writer ( fd -- writer )
TUPLE: write-task ;
C: write-task ( port callbacks -- task )
C: write-task ( port -- task )
[ >r <io-task> r> set-delegate ] keep ;
M: write-task do-io-task
@ -187,8 +283,41 @@ M: write-task io-task-events ( task -- events )
drop write-events ;
: write-fin ( str writer -- )
dup pending-error
>r dup string? [ ch>string ] unless r> >buffer ;
dup pending-error >buffer ;
: add-write-io-task ( callback task -- )
dup io-task-fd io-tasks get hash [
dup write-task? [
[
nip io-task-callbacks cons
] keep set-io-task-callbacks
] [
add-io-task
] ifte
] [
add-io-task
] ifte* ;
M: writer stream-flush ( stream -- )
[
swap <write-task> add-write-io-task io-multiplex
] callcc0 drop ;
M: writer stream-auto-flush ( stream -- ) drop ;
: wait-to-write ( len port -- )
tuck can-write? [ drop ] [ stream-flush ] ifte ;
: blocking-write ( str writer -- )
over length over wait-to-write write-fin ;
M: writer stream-write-attr ( string style writer -- )
nip >r dup string? [ ch>string ] unless r> blocking-write ;
M: writer stream-close ( stream -- )
dup stream-flush port-handle sys-close ;
! Copying from a reader to a writer
: can-copy? ( from -- ? )
dup eof? [ read-step ] [ drop t ] ifte ;
@ -207,69 +336,3 @@ M: write-task io-task-events ( task -- events )
] [
2drop f
] ifte ;
: io-task-fd io-task-port port-handle ;
: add-io-task ( task -- )
dup io-task-fd io-tasks get 2dup hash [
"Cannot perform multiple I/O ops on the same port" throw
] when set-hash ;
: add-write-io-task ( task -- )
dup io-task-fd io-tasks get hash [
dup write-task? [
[
>r io-task-callbacks r> io-task-callbacks append
] keep set-io-task-callbacks
] [
add-io-task
] ifte
] [
add-io-task
] ifte* ;
: remove-io-task ( task -- )
io-task-fd io-tasks get remove-hash ;
: pop-callback ( task -- callback )
dup io-task-callbacks uncons dup [
rot set-io-task-callbacks
] [
drop swap remove-io-task
] ifte ;
: handle-fd ( fd -- )
io-tasks get hash dup do-io-task [
pop-callback call
] [
drop
] ifte ;
: do-io-tasks ( pollfds n -- )
[
dup pick pollfd-nth dup pollfd-revents 0 = [
drop
] [
pollfd-fd handle-fd
] ifte
] repeat drop ;
: init-pollfd ( task pollfd -- )
over io-task-fd over set-pollfd-fd
swap io-task-events swap set-pollfd-events ;
: make-pollfds ( -- pollfds n )
io-tasks get dup hash-size [
<pollfd-array> swap hash-values [
dup io-task-fd pick pollfd-nth init-pollfd
] each
] keep ;
: io-multiplexer ( -- )
make-pollfds dupd 0 sys-poll do-io-tasks ;
: io-loop ( -- ) io-multiplexer yield io-loop ;
: init-io ( -- )
global [ <namespace> io-tasks set ] bind
[ io-loop ] in-thread ;

View File

@ -80,6 +80,12 @@ END-STRUCT
: sys-close ( fd -- )
"void" "libc" "close" [ "int" ] alien-invoke ;
: F_SETFL 4 ; ! set file status flags
: O_NONBLOCK 4 ; ! no delay
: sys-fcntl ( fd cmd key value -- n )
"int" "libc" "fcntl" [ "int" "int" "int" "int" ] alien-invoke ;
: sys-read ( fd buf nbytes -- n )
"ssize_t" "libc" "read" [ "int" "ulong" "size_t" ] alien-invoke ;

View File

@ -15,7 +15,7 @@ INLINE void* alien_offset(CELL object)
return alien->ptr;
case BYTE_ARRAY_TYPE:
array = untag_byte_array_fast(object);
return array + sizeof(F_ARRAY);
return array + 1;
case DISPLACED_ALIEN_TYPE:
d = untag_displaced_alien_fast(object);
return alien_offset(d->alien) + d->displacement;