factor/library/unix/io.factor

419 lines
10 KiB
Factor
Raw Normal View History

2005-04-14 19:37:13 -04:00
! Copyright (C) 2004, 2005 Slava Pestov.
! See http://factor.sf.net/license.txt for BSD license.
IN: io-internals
2005-06-13 01:42:16 -04:00
USING: alien assembler errors generic hashtables kernel
kernel-internals lists math sequences streams strings threads
2005-06-18 21:15:07 -04:00
unix-internals unparser vectors ;
2005-04-17 18:34:09 -04:00
! We want namespaces::bind to shadow the bind system call from
2005-04-17 18:34:09 -04:00
! unix-internals
USING: namespaces ;
2005-06-13 01:42:16 -04:00
! This will go elsewhere soon
: byte-bit ( n alien -- byte bit )
2005-06-15 16:34:16 -04:00
over -5 shift alien-unsigned-4 swap 31 bitand ;
2005-06-13 01:42:16 -04:00
2005-06-18 21:15:07 -04:00
: bit-length ( n -- n ) cell / ceiling ;
2005-06-13 01:42:16 -04:00
: <bit-array> ( n -- array )
2005-06-18 21:15:07 -04:00
bit-length <byte-array> ;
2005-06-13 01:42:16 -04:00
: bit-nth ( n alien -- ? )
byte-bit 1 swap shift bitand 0 > ;
: set-bit ( ? byte bit -- byte )
1 swap shift rot [ bitor ] [ bitnot bitand ] ifte ;
: set-bit-nth ( ? n alien -- )
[ byte-bit set-bit ] 2keep
2005-06-15 16:34:16 -04:00
swap -5 shift set-alien-unsigned-4 ;
2005-06-13 01:42:16 -04:00
2005-06-18 21:15:07 -04:00
: clear-bits ( alien len -- )
bit-length [
0 pick pick set-alien-unsigned-cell
] repeat drop ;
2005-06-13 01:42:16 -04:00
! Global variables
SYMBOL: read-fdset
SYMBOL: read-tasks
SYMBOL: write-fdset
SYMBOL: write-tasks
2005-04-14 19:37:13 -04:00
! Some general stuff
: file-mode OCT: 0600 ;
2005-06-13 01:42:16 -04:00
: (io-error) err_no strerror throw ;
2005-04-14 19:37:13 -04:00
2005-06-12 21:20:00 -04:00
: check-null ( n -- ) 0 = [ (io-error) ] when ;
: io-error ( n -- ) 0 < [ (io-error) ] when ;
: init-handle ( fd -- ) F_SETFL O_NONBLOCK fcntl io-error ;
2005-04-14 19:37:13 -04:00
! Common delegate of native stream readers and writers
2005-05-23 20:56:38 -04:00
TUPLE: port handle buffer error timeout cutoff ;
: make-buffer ( n -- buffer/f )
dup 0 > [ <buffer> ] [ drop f ] ifte ;
2005-04-14 19:37:13 -04:00
C: port ( handle buffer -- port )
2005-05-23 20:56:38 -04:00
[ 0 swap set-port-timeout ] keep
[ 0 swap set-port-cutoff ] keep
[ >r make-buffer r> set-delegate ] keep
2005-04-14 19:37:13 -04:00
[ >r dup init-handle r> set-port-handle ] keep ;
2005-04-24 23:02:19 -04:00
M: port stream-close ( port -- )
dup port-handle close
delegate [ buffer-free ] when* ;
2005-04-24 23:02:19 -04:00
2005-05-23 20:56:38 -04:00
: touch-port ( port -- )
dup port-timeout dup 0 = [
2drop
] [
millis + swap set-port-cutoff
] ifte ;
M: port set-timeout ( timeout port -- )
[ set-port-timeout ] keep touch-port ;
2005-04-14 19:37:13 -04:00
: buffered-port 8192 <port> ;
: >port< dup port-handle swap delegate ;
2005-06-18 21:15:07 -04:00
: pending-error ( port -- )
dup port-error f rot set-port-error throw ;
2005-04-14 19:37:13 -04:00
2005-06-18 21:15:07 -04:00
: EAGAIN 11 ;
: EINTR 4 ;
2005-06-18 21:15:07 -04:00
: defer-error ( port -- ? )
#! Return t if it is an unrecoverable error.
err_no dup EAGAIN = over EINTR = or [
2drop f
] [
[
"Error on fd " % over port-handle unparse %
": " % strerror %
] make-string swap set-port-error t
] ifte ;
2005-04-14 19:37:13 -04:00
! Associates a port with a list of continuations waiting on the
! port to finish I/O
2005-04-14 01:32:06 -04:00
TUPLE: io-task port callbacks ;
2005-04-14 19:37:13 -04:00
C: io-task ( port -- ) [ set-io-task-port ] keep ;
! Multiplexer
2005-04-12 18:31:50 -04:00
GENERIC: do-io-task ( task -- ? )
2005-06-13 01:42:16 -04:00
GENERIC: task-container ( task -- vector )
2005-05-23 20:56:38 -04:00
2005-04-14 19:37:13 -04:00
: io-task-fd io-task-port port-handle ;
2005-04-08 23:50:36 -04:00
2005-04-14 19:37:13 -04:00
: add-io-task ( callback task -- )
[ >r unit r> set-io-task-callbacks ] keep
2005-06-13 01:42:16 -04:00
dup io-task-fd over task-container 2dup hash [
2005-04-14 19:37:13 -04:00
"Cannot perform multiple I/O ops on the same port" throw
] when set-hash ;
2005-04-08 23:50:36 -04:00
2005-04-14 19:37:13 -04:00
: remove-io-task ( task -- )
2005-06-13 01:42:16 -04:00
dup io-task-fd swap task-container remove-hash ;
2005-04-08 23:50:36 -04:00
2005-04-14 19:37:13 -04:00
: pop-callback ( task -- callback )
dup io-task-callbacks uncons dup [
rot set-io-task-callbacks
] [
drop swap remove-io-task
] ifte ;
2005-04-03 18:28:55 -04:00
2005-06-13 01:42:16 -04:00
: handle-fd ( task -- )
dup do-io-task [
dup io-task-port touch-port pop-callback [ call ] when*
2005-04-14 19:37:13 -04:00
] [
2005-06-13 01:42:16 -04:00
drop
2005-04-14 19:37:13 -04:00
] ifte ;
2005-04-03 18:28:55 -04:00
2005-05-23 20:56:38 -04:00
: timeout? ( port -- ? )
port-cutoff dup 0 = not swap millis < and ;
2005-06-13 01:42:16 -04:00
: handle-fd? ( fdset task -- ? )
dup io-task-port timeout? [
2005-06-15 16:34:16 -04:00
2drop t
] [
2005-06-18 21:15:07 -04:00
io-task-fd swap bit-nth
2005-06-15 16:34:16 -04:00
] ifte ;
2005-05-23 20:56:38 -04:00
2005-06-13 01:42:16 -04:00
: handle-fdset ( fdset tasks -- )
2005-04-14 19:37:13 -04:00
[
2005-06-13 01:42:16 -04:00
cdr tuck handle-fd? [ handle-fd ] [ drop ] ifte
] hash-each-with ;
2005-05-23 20:56:38 -04:00
2005-06-13 01:42:16 -04:00
: init-fdset ( fdset tasks -- )
2005-06-18 21:15:07 -04:00
>r dup FD_SETSIZE clear-bits r>
2005-06-13 01:42:16 -04:00
[ car t swap rot set-bit-nth ] hash-each-with ;
2005-05-23 20:56:38 -04:00
2005-06-13 01:42:16 -04:00
: init-fdsets ( -- read write except )
read-fdset get [ read-tasks get init-fdset ] keep
write-fdset get [ write-tasks get init-fdset ] keep
NULL ;
2005-04-14 19:37:13 -04:00
2005-04-30 17:17:10 -04:00
: io-multiplex ( timeout -- )
2005-06-13 01:42:16 -04:00
>r FD_SETSIZE init-fdsets r> make-timeval select drop
read-fdset get read-tasks get handle-fdset
write-fdset get write-tasks get handle-fdset ;
2005-04-14 19:37:13 -04:00
! Readers
: open-read ( path -- fd )
2005-04-17 18:34:09 -04:00
O_RDONLY file-mode open dup io-error ;
2005-04-08 23:50:36 -04:00
! The cr slot is set to true by read-line-loop if the last
! character read was \r.
TUPLE: reader line cr ;
C: reader ( handle -- reader )
[ >r buffered-port r> set-delegate ] keep ;
: pop-line ( reader -- sbuf/f )
dup pending-error [ reader-line f ] keep set-reader-line ;
: read-fin ( reader -- str ) pop-line dup [ >string ] when ;
: reader-cr> ( reader -- ? )
dup reader-cr >r f swap set-reader-cr r> ;
! Reading lines
: read-line-char ( reader ch -- )
f pick set-reader-cr swap reader-line push ;
: read-line-loop ( reader -- ? )
2005-04-03 16:55:56 -04:00
dup buffer-length 0 = [
drop f
2005-04-03 16:55:56 -04:00
] [
dup buffer-pop
dup CHAR: \r = [
drop t swap set-reader-cr t
2005-04-03 16:55:56 -04:00
] [
dup CHAR: \n = [
drop dup reader-cr> [
read-line-loop
] [
drop t
] ifte
] [
dupd read-line-char read-line-loop
] ifte
2005-04-03 16:55:56 -04:00
] ifte
] ifte ;
2005-04-03 18:28:55 -04:00
: init-reader ( count reader -- ) >r <sbuf> r> set-reader-line ;
2005-04-03 16:55:56 -04:00
: can-read-line? ( reader -- ? )
dup pending-error 80 over init-reader read-line-loop ;
2005-04-03 16:55:56 -04:00
: reader-eof ( reader -- )
dup reader-line empty? [
f swap set-reader-line
2005-04-03 16:55:56 -04:00
] [
drop
] ifte ;
2005-04-03 18:28:55 -04:00
: (refill) ( port -- n )
2005-06-18 21:15:07 -04:00
>port< dup buffer-end swap buffer-capacity read ;
2005-06-18 21:15:07 -04:00
: refill ( port -- ? )
#! Return f if there is a recoverable error
2005-04-27 01:40:09 -04:00
dup buffer-length 0 = [
2005-06-18 21:15:07 -04:00
dup (refill) dup 0 >= [
swap n>buffer t
] [
drop defer-error
] ifte
2005-04-27 01:40:09 -04:00
] [
2005-06-18 21:15:07 -04:00
drop t
2005-04-27 01:40:09 -04:00
] ifte ;
2005-04-03 18:28:55 -04:00
2005-04-12 18:31:50 -04:00
TUPLE: read-line-task ;
2005-04-14 19:37:13 -04:00
C: read-line-task ( port -- task )
2005-04-12 18:31:50 -04:00
[ >r <io-task> r> set-delegate ] keep ;
M: read-line-task do-io-task ( task -- ? )
2005-06-18 21:15:07 -04:00
io-task-port dup refill [
dup eof? [
reader-eof t
] [
read-line-loop
2005-06-18 21:15:07 -04:00
] ifte
2005-04-12 18:31:50 -04:00
] [
2005-06-18 21:15:07 -04:00
drop f
2005-04-12 18:31:50 -04:00
] ifte ;
2005-06-13 01:42:16 -04:00
M: read-line-task task-container drop read-tasks get ;
2005-04-14 01:32:06 -04:00
: wait-to-read-line ( port -- )
dup can-read-line? [
[ swap <read-line-task> add-io-task stop ] callcc0
] unless drop ;
M: reader stream-readln ( stream -- line )
dup wait-to-read-line read-fin ;
2005-04-27 01:40:09 -04:00
: trailing-cr ( reader -- )
#! Handle a corner case. If the previous request was a line
#! read and the line ends with \r\n, the reader stopped
#! reading at \r and set the reader-cr flag to true. But we
#! must ignore the \n.
2005-06-18 21:15:07 -04:00
dup buffer-length 1 >= [
dup reader-cr [
dup buffer-peek CHAR: \n = [
1 swap buffer-consume
] [
drop
] ifte
2005-04-27 01:40:09 -04:00
] [
drop
] ifte
] [
drop
] ifte ;
! Reading character counts
: read-step ( count reader -- ? )
2005-04-27 01:40:09 -04:00
dup trailing-cr
dup reader-line -rot >r over length - ( remaining) r>
2005-04-27 01:40:09 -04:00
2dup buffer-length <= [
buffer> nappend t
2005-04-03 18:28:55 -04:00
] [
buffer>> nip nappend f
2005-04-03 18:28:55 -04:00
] ifte ;
: can-read-count? ( count reader -- ? )
dup pending-error 2dup init-reader read-step ;
2005-04-03 18:28:55 -04:00
2005-04-12 18:31:50 -04:00
TUPLE: read-task count ;
C: read-task ( count port -- task )
[ >r <io-task> r> set-delegate ] keep
[ set-read-task-count ] keep ;
2005-04-12 18:31:50 -04:00
: >read-task< dup read-task-count swap io-task-port ;
2005-04-17 18:34:09 -04:00
M: read-task do-io-task ( task -- ? )
2005-06-18 21:15:07 -04:00
>read-task< dup refill [
dup eof? [
reader-eof drop t
2005-06-18 21:15:07 -04:00
] [
read-step
] ifte
2005-04-12 18:31:50 -04:00
] [
2005-06-18 21:15:07 -04:00
2drop f
2005-04-12 18:31:50 -04:00
] ifte ;
2005-06-13 01:42:16 -04:00
M: read-task task-container drop read-tasks get ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
: wait-to-read ( count port -- )
2dup can-read-count? [
[ -rot <read-task> add-io-task stop ] callcc0
] unless 2drop ;
2005-04-14 19:37:13 -04:00
M: reader stream-read ( count stream -- string )
[ wait-to-read ] keep read-fin ;
2005-04-14 19:37:13 -04:00
M: reader stream-read1 ( stream -- string )
1 over wait-to-read reader-line first ;
2005-04-14 19:37:13 -04:00
! Writers
: open-write ( path -- fd )
2005-04-17 18:34:09 -04:00
O_WRONLY O_CREAT bitor O_TRUNC bitor file-mode open
2005-04-14 19:37:13 -04:00
dup io-error ;
2005-04-08 23:50:36 -04:00
TUPLE: writer ;
2005-04-08 23:50:36 -04:00
C: writer ( fd -- writer )
[ >r buffered-port r> set-delegate ] keep ;
2005-06-18 21:15:07 -04:00
: write-step ( port -- )
dup >port< dup buffer@ swap buffer-length write dup 0 >= [
swap buffer-consume
] [
2005-06-18 21:15:07 -04:00
drop defer-error drop
] ifte ;
: can-write? ( len writer -- ? )
#! If the buffer is empty and the string is too long,
#! extend the buffer.
dup pending-error
2005-05-04 01:14:45 -04:00
dup eof? [
2drop t
] [
[ buffer-fill + ] keep buffer-capacity <=
] ifte ;
2005-04-12 18:31:50 -04:00
TUPLE: write-task ;
2005-04-14 19:37:13 -04:00
C: write-task ( port -- task )
2005-04-12 18:31:50 -04:00
[ >r <io-task> r> set-delegate ] keep ;
M: write-task do-io-task
2005-04-14 01:32:06 -04:00
io-task-port dup buffer-length 0 = over port-error or [
2005-04-12 18:31:50 -04:00
0 swap buffer-reset t
] [
2005-06-18 21:15:07 -04:00
write-step f
2005-04-12 18:31:50 -04:00
] ifte ;
2005-06-13 01:42:16 -04:00
M: write-task task-container drop write-tasks get ;
2005-04-14 01:32:06 -04:00
: write-fin ( str writer -- )
2005-04-14 19:37:13 -04:00
dup pending-error >buffer ;
2005-04-12 18:31:50 -04:00
2005-04-14 19:37:13 -04:00
: add-write-io-task ( callback task -- )
2005-06-13 01:42:16 -04:00
dup io-task-fd write-tasks get hash [
2005-04-14 01:32:06 -04:00
dup write-task? [
2005-06-13 01:42:16 -04:00
[ nip io-task-callbacks cons ] keep
set-io-task-callbacks
2005-04-14 01:32:06 -04:00
] [
2005-04-15 22:42:01 -04:00
drop add-io-task
2005-04-14 01:32:06 -04:00
] ifte
] [
add-io-task
] ifte* ;
2005-04-14 19:37:13 -04:00
M: writer stream-flush ( stream -- )
[ swap <write-task> add-write-io-task stop ] callcc0 drop ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
M: writer stream-auto-flush ( stream -- ) drop ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
: wait-to-write ( len port -- )
tuck can-write? [ drop ] [ stream-flush ] ifte ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
: blocking-write ( str writer -- )
over length over wait-to-write write-fin ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
M: writer stream-write-attr ( string style writer -- )
nip >r dup string? [ ch>string ] unless r> blocking-write ;
2005-04-14 01:32:06 -04:00
2005-04-14 19:37:13 -04:00
M: writer stream-close ( stream -- )
2005-04-24 23:02:19 -04:00
dup stream-flush delegate stream-close ;
2005-04-17 18:34:09 -04:00
! Make a duplex stream for reading/writing a pair of fds
2005-04-22 02:24:38 -04:00
: <fd-stream> ( infd outfd flush? -- stream )
2005-04-17 18:34:09 -04:00
>r >r <reader> r> <writer> r> <duplex-stream> ;
2005-04-14 01:32:06 -04:00
2005-05-03 20:09:04 -04:00
: idle-io-task ( -- )
[ schedule-thread 10 io-multiplex stop ] callcc0
idle-io-task ;
2005-05-03 20:09:04 -04:00
2005-04-24 23:02:19 -04:00
USE: stdio
2005-04-17 18:34:09 -04:00
: init-io ( -- )
#! Should only be called on startup. Calling this at any
#! other time can have unintended consequences.
global [
2005-06-13 01:42:16 -04:00
<namespace> read-tasks set
FD_SETSIZE <bit-array> read-fdset set
<namespace> write-tasks set
FD_SETSIZE <bit-array> write-fdset set
2005-04-17 18:34:09 -04:00
0 1 t <fd-stream> stdio set
2005-05-03 20:09:04 -04:00
] bind
[ idle-io-task ] in-thread ;