i/o refactoring continues

cvs
Slava Pestov 2004-08-16 01:50:44 +00:00
parent 0c3fa9d74c
commit 171c890597
17 changed files with 319 additions and 247 deletions

View File

@ -1,6 +1,4 @@
export CC=gcc34 export CC=gcc
export CFLAGS="-lm -pedantic -Wall -Winline -O3 -march=pentium4 -fomit-frame-pointer" export CFLAGS="-lm -g -Wall -Wno-long-long -Wno-inline"
$CC $CFLAGS -o f native/*.c $CC $CFLAGS -o f native/*.c
strip f

View File

@ -67,9 +67,13 @@ DEFER: open-file
DEFER: server-socket DEFER: server-socket
DEFER: close-fd DEFER: close-fd
DEFER: accept-fd DEFER: accept-fd
DEFER: can-read-line?
DEFER: add-read-line-io-task
DEFER: read-line-fd-8 DEFER: read-line-fd-8
DEFER: can-write?
DEFER: add-write-io-task
DEFER: write-fd-8 DEFER: write-fd-8
DEFER: flush-fd DEFER: next-io-task
IN: parser IN: parser
DEFER: str>float DEFER: str>float
@ -211,9 +215,13 @@ IN: cross-compiler
server-socket server-socket
close-fd close-fd
accept-fd accept-fd
can-read-line?
add-read-line-io-task
read-line-fd-8 read-line-fd-8
can-write?
add-write-io-task
write-fd-8 write-fd-8
flush-fd next-io-task
room room
os-env os-env
millis millis

View File

@ -26,10 +26,33 @@
! ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ! ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
IN: io-internals IN: io-internals
USE: combinators
USE: continuations
USE: kernel USE: kernel
USE: namespaces USE: namespaces
USE: combinators USE: stack
USE: strings
: stdin 0 getenv ; : stdin 0 getenv ;
: stdout 1 getenv ; : stdout 1 getenv ;
: stderr 2 getenv ; : stderr 2 getenv ;
: flush-fd ( port -- )
[ swap add-write-io-task next-io-task drop ( call ) ] callcc0 ;
: wait-to-write ( len port -- )
tuck can-write? [ drop ] [ flush-fd ] ifte ;
: blocking-write ( str port -- )
over
dup string? [ str-length ] [ drop 1 ] ifte
over wait-to-write write-fd-8 ;
: fill-fd ( port -- )
[ swap add-read-line-io-task next-io-task drop ( call ) ] callcc0 ;
: wait-to-read-line ( port -- )
dup can-read-line? [ drop ] [ fill-fd ] ifte ;
: blocking-read-line ( port -- line )
dup wait-to-read-line read-line-fd-8 dup [ sbuf>str ] when ;

View File

@ -27,6 +27,7 @@
IN: streams IN: streams
USE: combinators USE: combinators
USE: continuations
USE: io-internals USE: io-internals
USE: errors USE: errors
USE: kernel USE: kernel
@ -44,39 +45,32 @@ USE: namespaces
"in" set "in" set
( str -- ) ( str -- )
[ "out" get write-fd-8 ] "fwrite" set [ "out" get blocking-write ] "fwrite" set
( -- str ) ( -- str )
[ [ "in" get dup [ blocking-read-line ] when ] "freadln" set
"in" get read-line-fd-8
] "freadln" set
( -- ) ( -- )
[ [ "out" get [ flush-fd ] when* ] "fflush" set
"out" get [ flush-fd ] when*
] "fflush" set
( -- ) ( -- )
[ [
"in" get [ close-fd ] when* "in" get [ close-fd ] when*
"out" get [ close-fd ] when* "out" get [ dup flush-fd close-fd ] when*
] "fclose" set ] "fclose" set
] extend ; ] extend ;
: <file-stream> ( path read? write? -- stream )
open-file dup <fd-stream> ;
: <filecr> ( path -- stream ) : <filecr> ( path -- stream )
t f <file-stream> ; t f open-file f <fd-stream> ;
: <filecw> ( path -- stream ) : <filecw> ( path -- stream )
f t <file-stream> ; f t open-file f swap <fd-stream> ;
: <filebr> ( path -- stream ) : <filebr> ( path -- stream )
t f <file-stream> ; <filecr> ;
: <filebw> ( path -- stream ) : <filebw> ( path -- stream )
f t <file-stream> ; <filecw> ;
: <server> ( port -- stream ) : <server> ( port -- stream )
#! Starts listening on localhost:port. Returns a stream that #! Starts listening on localhost:port. Returns a stream that

View File

@ -2,13 +2,13 @@
void fatal_error(char* msg, CELL tagged) void fatal_error(char* msg, CELL tagged)
{ {
printf("Fatal error: %s %ld\n",msg,tagged); fprintf(stderr,"Fatal error: %s %ld\n",msg,tagged);
exit(1); exit(1);
} }
void critical_error(char* msg, CELL tagged) void critical_error(char* msg, CELL tagged)
{ {
printf("Critical error: %s %ld\n",msg,tagged); fprintf(stderr,"Critical error: %s %ld\n",msg,tagged);
save_image("factor.crash.image"); save_image("factor.crash.image");
exit(1); exit(1);
} }
@ -59,13 +59,3 @@ void range_error(CELL tagged, CELL index, CELL max)
tag_cons(cons(tag_fixnum(max),F))))); tag_cons(cons(tag_fixnum(max),F)))));
general_error(ERROR_RANGE,tag_cons(c)); general_error(ERROR_RANGE,tag_cons(c));
} }
void io_error(const char* func)
{
STRING* function = from_c_string(func);
STRING* error = from_c_string(strerror(errno));
CONS* c = cons(tag_object(function),tag_cons(
cons(tag_object(error),F)));
general_error(ERROR_IO,tag_cons(c));
}

View File

@ -15,4 +15,3 @@ void throw_error(CELL object);
void general_error(CELL error, CELL tagged); void general_error(CELL error, CELL tagged);
void type_error(CELL type, CELL tagged); void type_error(CELL type, CELL tagged);
void range_error(CELL tagged, CELL index, CELL max); void range_error(CELL tagged, CELL index, CELL max);
void io_error(const char* func);

View File

@ -13,6 +13,7 @@ int main(int argc, char** argv)
init_arena(DEFAULT_ARENA); init_arena(DEFAULT_ARENA);
load_image(argv[1]); load_image(argv[1]);
init_stacks(); init_stacks();
init_iomux();
init_io(); init_io();
run(); run();

View File

@ -2,12 +2,20 @@
void init_io(void) void init_io(void)
{ {
env.user[STDIN_ENV] = port(0); env.user[STDIN_ENV] = tag_object(port(0));
set_nonblocking(0); env.user[STDOUT_ENV] = tag_object(port(1));
env.user[STDOUT_ENV] = port(1); env.user[STDERR_ENV] = tag_object(port(2));
set_nonblocking(1); }
env.user[STDERR_ENV] = port(2);
/* set_nonblocking(2); */ bool can_read_line(PORT* port)
{
return false;
}
void primitive_can_read_line(void)
{
PORT* port = untag_port(dpop());
dpush(tag_boolean(can_read_line(port)));
} }
/* Return true if something was read */ /* Return true if something was read */
@ -20,7 +28,7 @@ bool read_step(PORT* port)
if(amount == -1) if(amount == -1)
{ {
if(errno != EAGAIN) if(errno != EAGAIN)
io_error(__FUNCTION__); io_error(port,__FUNCTION__);
return false; return false;
} }
else else
@ -31,21 +39,12 @@ bool read_step(PORT* port)
} }
} }
READLINE_STAT read_line_step(PORT* port) bool read_line_step(PORT* port)
{ {
int i; int i;
char ch; char ch;
SBUF* line = port->line; SBUF* line = untag_sbuf(port->line);
if(port->buf_pos >= port->buf_fill)
{
if(!read_step(port))
return READLINE_WAIT;
if(port->buf_fill == 0)
return READLINE_EOF;
}
for(i = port->buf_pos; i < port->buf_fill; i++) for(i = port->buf_pos; i < port->buf_fill; i++)
{ {
@ -53,7 +52,7 @@ READLINE_STAT read_line_step(PORT* port)
if(ch == '\n') if(ch == '\n')
{ {
port->buf_pos = i + 1; port->buf_pos = i + 1;
return READLINE_EOL; return true;
} }
else else
set_sbuf_nth(line,line->top,ch); set_sbuf_nth(line,line->top,ch);
@ -63,43 +62,15 @@ READLINE_STAT read_line_step(PORT* port)
/* We've reached the end of the above loop, without seeing a newline /* We've reached the end of the above loop, without seeing a newline
or EOF, so read again */ or EOF, so read again */
return READLINE_AGAIN; return false;
} }
void primitive_read_line_fd_8(void) void primitive_read_line_fd_8(void)
{ {
PORT* port = untag_port(dpeek()); PORT* port = untag_port(dpeek());
SBUF* line; drepl(port->line);
READLINE_STAT state; port->line = F;
init_buffer(port,B_READ);
if(port->line == NULL)
port->line = sbuf(LINE_SIZE);
else
port->line->top = 0;
line = port->line;
add_io_task(IO_TASK_READ_LINE,port,F);
for(;;)
{
state = read_line_step(port);
if(state == READLINE_WAIT)
iomux();
else if(state == READLINE_EOF && line->top == 0)
{
/* didn't read anything before EOF */
drepl(F);
break;
}
else if(state == READLINE_EOL)
{
drepl(tag_object(sbuf_to_string(line)));
break;
}
}
remove_io_task(IO_TASK_READ_LINE,port);
} }
/* Return true if write was done */ /* Return true if write was done */
@ -113,7 +84,7 @@ bool write_step(PORT* port)
if(amount == -1) if(amount == -1)
{ {
if(errno != EAGAIN) if(errno != EAGAIN)
io_error(__FUNCTION__); io_error(port,__FUNCTION__);
return false; return false;
} }
else else
@ -123,39 +94,48 @@ bool write_step(PORT* port)
} }
} }
/* keep writing to the stream until everything is written */ bool can_write(PORT* port, FIXNUM len)
void flush_buffer(PORT* port)
{ {
IO_TASK* task; CELL buf_capacity;
if(port->buf_mode != B_WRITE || port->buf_fill == 0)
return;
task = add_io_task(IO_TASK_WRITE,port,F); switch(port->buf_mode)
for(;;)
{ {
if(port->buf_fill == port->buf_pos) case B_NONE:
break; return true;
case B_READ_LINE:
if(!write_step(port)) return false;
iomux(); case B_WRITE:
buf_capacity = port->buffer->capacity * CHARS;
/* Is the string longer than the buffer? */
if(port->buf_fill == 0 && len > buf_capacity)
{
/* Increase the buffer to fit the string */
port->buffer = allot_string(len / CHARS + 1);
return true;
}
else
return (port->buf_fill + len <= buf_capacity);
default:
critical_error("Bad buf_mode",port->buf_mode);
return false;
} }
}
remove_io_task(IO_TASK_WRITE,port); void primitive_can_write(void)
{
port->buf_pos = 0; PORT* port = untag_port(dpop());
port->buf_fill = 0; FIXNUM len = to_fixnum(dpop());
dpush(tag_boolean(can_write(port,len)));
} }
void write_fd_char_8(PORT* port, FIXNUM ch) void write_fd_char_8(PORT* port, FIXNUM ch)
{ {
char c = (char)ch; char c = (char)ch;
init_buffer(port,B_WRITE); if(!can_write(port,1))
io_error(port,__FUNCTION__);
/* Is the buffer full? */ init_buffer(port,B_WRITE);
if(port->buf_fill == port->buffer->capacity * CHARS)
flush_buffer(port);
bput((CELL)port->buffer + sizeof(STRING) + port->buf_fill,c); bput((CELL)port->buffer + sizeof(STRING) + port->buf_fill,c);
port->buf_fill++; port->buf_fill++;
@ -163,24 +143,15 @@ void write_fd_char_8(PORT* port, FIXNUM ch)
void write_fd_string_8(PORT* port, STRING* str) void write_fd_string_8(PORT* port, STRING* str)
{ {
char* c_str = to_c_string(str); char* c_str;
/* Note this ensures the buffer is large enough to fit the string */
if(!can_write(port,str->capacity))
io_error(port,__FUNCTION__);
init_buffer(port,B_WRITE); init_buffer(port,B_WRITE);
/* Is the string longer than the buffer? */ c_str = to_c_string(str);
if(str->capacity > port->buffer->capacity * CHARS)
{
flush_buffer(port);
/* Increase the buffer to fit the string */
port->buffer = allot_string(str->capacity / CHARS + 1);
}
/* Is there enough room in the buffer? If not, flush */
if(port->buf_fill + str->capacity
> port->buffer->capacity * CHARS)
{
flush_buffer(port);
}
/* Append string to buffer */ /* Append string to buffer */
memcpy((void*)((CELL)port->buffer + sizeof(STRING) memcpy((void*)((CELL)port->buffer + sizeof(STRING)
@ -211,21 +182,20 @@ void primitive_write_fd_8(void)
} }
} }
void primitive_flush_fd(void)
{
PORT* port = untag_port(dpop());
flush_buffer(port);
}
void primitive_close_fd(void) void primitive_close_fd(void)
{ {
/* This does not flush. */
PORT* port = untag_port(dpop()); PORT* port = untag_port(dpop());
flush_buffer(port);
close(port->fd); close(port->fd);
} }
void set_nonblocking(int fd) void io_error(PORT* port, const char* func)
{ {
if(fcntl(fd,F_SETFL,O_NONBLOCK,1) == -1) STRING* function = from_c_string(func);
io_error(__FUNCTION__); STRING* error = from_c_string(strerror(errno));
CONS* c = cons(tag_object(function),tag_cons(
cons(tag_object(error),F)));
general_error(ERROR_IO,tag_cons(c));
} }

View File

@ -1,24 +1,19 @@
#define LINE_SIZE 80 #define LINE_SIZE 80
#define BUF_SIZE (32 * 1024) #define BUF_SIZE (32 * 1024)
bool can_read_line(PORT* port);
void primitive_can_read_line(void);
bool read_step(PORT* port); bool read_step(PORT* port);
bool read_line_step(PORT* port);
/* read_line_step() return values */
typedef enum {
READLINE_WAIT, /* means we have to wait for more I/O */
READLINE_AGAIN,
READLINE_EOL,
READLINE_EOF
} READLINE_STAT;
READLINE_STAT read_line_step(PORT* port);
bool write_step(PORT* port); bool write_step(PORT* port);
void flush_buffer(PORT* port); void flush_buffer(PORT* port);
void init_io(void); void init_io(void);
void primitive_read_line_fd_8(void); void primitive_read_line_fd_8(void);
bool can_write(PORT* port, FIXNUM len);
void primitive_can_write(void);
void write_fd_char_8(PORT* port, FIXNUM ch); void write_fd_char_8(PORT* port, FIXNUM ch);
void write_fd_string_8(PORT* port, STRING* str); void write_fd_string_8(PORT* port, STRING* str);
void primitive_write_fd_8(void); void primitive_write_fd_8(void);
void primitive_flush_fd(void);
void primitive_close_fd(void); void primitive_close_fd(void);
void set_nonblocking(int fd); void io_error(PORT* port, const char* func);

View File

@ -9,17 +9,17 @@ void primitive_open_file(void)
int fd; int fd;
if(read && write) if(read && write)
mode = O_RDWR | O_CREAT | O_NONBLOCK; mode = O_RDWR | O_CREAT;
else if(read) else if(read)
mode = O_RDONLY | O_NONBLOCK; mode = O_RDONLY;
else if(write) else if(write)
mode = O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK; mode = O_WRONLY | O_CREAT | O_TRUNC;
else else
mode = O_NONBLOCK; mode = 0;
fd = open(path,mode); fd = open(path,mode);
if(fd < 0) if(fd < 0)
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
dpush(port(fd)); dpush(tag_object(port(fd)));
} }

View File

@ -7,8 +7,8 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks)
FD_ZERO(fdset); FD_ZERO(fdset);
for(i = 0; i < FD_SETSIZE; i++) for(i = 0; i < FD_SETSIZE; i++)
{ {
read_io_tasks[i].port = F; io_tasks[i].port = F;
read_io_tasks[i].callback = F; io_tasks[i].callback = F;
} }
} }
@ -21,7 +21,7 @@ void init_iomux(void)
init_io_tasks(&write_fd_set,write_io_tasks); init_io_tasks(&write_fd_set,write_io_tasks);
} }
IO_TASK* add_io_task_impl( IO_TASK* add_io_task(
IO_TASK_TYPE type, IO_TASK_TYPE type,
PORT* port, PORT* port,
CELL callback, CELL callback,
@ -30,6 +30,9 @@ IO_TASK* add_io_task_impl(
{ {
int fd = port->fd; int fd = port->fd;
/* if(io_tasks[fd].port != F)
critical_error("Adding I/O task twice",fd); */
io_tasks[fd].type = type; io_tasks[fd].type = type;
io_tasks[fd].port = tag_object(port); io_tasks[fd].port = tag_object(port);
io_tasks[fd].callback = callback; io_tasks[fd].callback = callback;
@ -40,24 +43,23 @@ IO_TASK* add_io_task_impl(
return &io_tasks[fd]; return &io_tasks[fd];
} }
IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback) void primitive_add_read_line_io_task(void)
{ {
switch(type) PORT* port = untag_port(dpop());
{ CELL callback = dpop();
case IO_TASK_READ_LINE: add_io_task(IO_TASK_READ_LINE,port,callback,
case IO_TASK_READ_COUNT: read_io_tasks,&read_fd_count);
return add_io_task_impl(type,port,callback,
read_io_tasks,&read_fd_count);
case IO_TASK_WRITE:
return add_io_task_impl(type,port,callback,
write_io_tasks,&write_fd_count);
default:
fatal_error("Invalid IO_TASK_TYPE",type);
return NULL;
}
} }
void remove_io_task_impl( void primitive_add_write_io_task(void)
{
PORT* port = untag_port(dpop());
CELL callback = dpop();
add_io_task(IO_TASK_WRITE,port,callback,
write_io_tasks,&write_fd_count);
}
void remove_io_task(
IO_TASK_TYPE type, IO_TASK_TYPE type,
PORT* port, PORT* port,
IO_TASK* io_tasks, IO_TASK* io_tasks,
@ -72,64 +74,139 @@ void remove_io_task_impl(
*fd_count = *fd_count - 1; *fd_count = *fd_count - 1;
} }
void remove_io_task(IO_TASK_TYPE type, PORT* port) void remove_io_tasks(PORT* port)
{ {
switch(type) remove_io_task(IO_TASK_READ_LINE,port,
{ read_io_tasks,&read_fd_count);
case IO_TASK_READ_LINE: remove_io_task(IO_TASK_WRITE,port,
case IO_TASK_READ_COUNT: write_io_tasks,&write_fd_count);
remove_io_task_impl(type,port,read_io_tasks,&read_fd_count);
break;
case IO_TASK_WRITE:
remove_io_task_impl(type,port,write_io_tasks,&write_fd_count);
break;
}
} }
void perform_io_task(IO_TASK* task) bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
{
if(task->port == F)
return;
switch(task->type)
{
case IO_TASK_READ_LINE:
break;
case IO_TASK_WRITE:
write_step(untag_port(task->port));
break;
default:
critical_error("Bad I/O task",task->type);
break;
}
}
bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks)
{ {
bool retval = false; bool retval = false;
int i; int i;
for(i = 0; i < read_fd_count; i++)
FD_ZERO(fdset);
for(i = 0; i < fd_count; i++)
{ {
if(read_io_tasks[i].port != F) if(typep(PORT_TYPE,io_tasks[i].port))
{ {
retval = true; retval = true;
FD_SET(i,&read_fd_set); FD_SET(i,fdset);
} }
} }
return retval; return retval;
} }
/* Wait for I/O and return a callback. */ bool perform_read_line_io_task(PORT* port)
CELL iomux(void)
{ {
bool reading = set_up_fd_set(&read_fd_set,read_io_tasks); init_buffer(port,B_READ_LINE);
bool writing = set_up_fd_set(&write_fd_set,write_io_tasks); if(port->buf_pos >= port->buf_fill)
{
if(!read_step(port))
return false;
}
if(port->buf_fill == 0)
{
/* EOF */
port->line = F;
return true;
}
else
return read_line_step(port);
}
bool perform_write_io_task(PORT* port)
{
init_buffer(port,B_WRITE);
if(write_step(port))
{
if(port->buf_pos == port->buf_fill)
{
/* All written */
port->buf_pos = 0;
port->buf_fill = 0;
return true;
}
}
return false;
}
CELL perform_io_task(IO_TASK* task)
{
PORT* port = untag_port(task->port);
CELL callback = task->callback;
switch(task->type)
{
case IO_TASK_READ_LINE:
remove_io_task(IO_TASK_READ_LINE,port,
read_io_tasks,&read_fd_count);
if(perform_read_line_io_task(port))
return callback;
else
{
add_io_task(IO_TASK_READ_LINE,port,
callback,read_io_tasks,
&read_fd_count);
return F;
}
case IO_TASK_WRITE:
remove_io_task(IO_TASK_WRITE,port,
write_io_tasks,&write_fd_count);
if(perform_write_io_task(port))
return callback;
else
{
add_io_task(IO_TASK_WRITE,port,
callback,write_io_tasks,
&write_fd_count);
return F;
}
default:
critical_error("Bad I/O task",task->type);
return F;
}
}
CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
{
int i;
CELL callback;
for(i = 0; i < fd_count; i++)
{
if(FD_ISSET(i,fdset))
{
if(io_tasks[i].port == F)
critical_error("select() returned fd for non-existent task",i);
else
{
callback = perform_io_task(&io_tasks[i]);
if(callback != F)
return callback;
}
}
}
return F;
}
/* Wait for I/O and return a callback. */
CELL next_io_task(void)
{
bool reading = set_up_fd_set(&read_fd_set,
read_fd_count,read_io_tasks);
bool writing = set_up_fd_set(&write_fd_set,
write_fd_count,write_io_tasks);
CELL callback;
if(!reading && !writing) if(!reading && !writing)
fatal_error("iomux() called with no IO tasks",0); critical_error("next_io_task() called with no IO tasks",0);
select(read_fd_count > write_fd_count select(read_fd_count > write_fd_count
? read_fd_count : write_fd_count, ? read_fd_count : write_fd_count,
@ -137,13 +214,16 @@ CELL iomux(void)
(writing ? &write_fd_set : NULL), (writing ? &write_fd_set : NULL),
NULL,NULL); NULL,NULL);
/* for(i = 0; i < read_fd_count; i++) callback = perform_io_tasks(&read_fd_set,read_fd_count,read_io_tasks);
perform_io_task(&read_io_tasks[i]); if(callback != F)
return callback;
for(i = 0; i < write_fd_count; i++) return perform_io_tasks(&write_fd_set,write_fd_count,write_io_tasks);
perform_io_task(&write_io_tasks[i]); */ }
return F; void primitive_next_io_task(void)
{
dpush(next_io_task());
} }
void collect_io_tasks(void) void collect_io_tasks(void)

View File

@ -20,20 +20,25 @@ int write_fd_count;
void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks); void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks);
void init_iomux(void); void init_iomux(void);
IO_TASK* add_io_task_impl( IO_TASK* add_io_task(
IO_TASK_TYPE type, IO_TASK_TYPE type,
PORT* port, PORT* port,
CELL callback, CELL callback,
IO_TASK* io_tasks, IO_TASK* io_tasks,
int* fd_count); int* fd_count);
IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback); void primitive_add_read_line_io_task(void);
void remove_io_task_impl( void primitive_add_write_io_task(void);
void remove_io_task(
IO_TASK_TYPE type, IO_TASK_TYPE type,
PORT* port, PORT* port,
IO_TASK* io_tasks, IO_TASK* io_tasks,
int* fd_count); int* fd_count);
void remove_io_task(IO_TASK_TYPE type, PORT* port); void remove_io_tasks(PORT* port);
void perform_io_task(IO_TASK* task); bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks); bool perform_read_line_io_task(PORT* port);
CELL iomux(void); bool perform_write_io_task(PORT* port);
CELL perform_io_task(IO_TASK* task);
CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
CELL next_io_task(void);
void primitive_next_io_task(void);
void collect_io_tasks(void); void collect_io_tasks(void);

View File

@ -11,16 +11,20 @@ PORT* untag_port(CELL tagged)
return p; return p;
} }
CELL port(CELL fd) PORT* port(CELL fd)
{ {
PORT* port = allot_object(PORT_TYPE,sizeof(PORT)); PORT* port = allot_object(PORT_TYPE,sizeof(PORT));
port->fd = fd; port->fd = fd;
port->buffer = NULL; port->buffer = NULL;
port->line = NULL; port->line = F;
port->buf_mode = B_NONE; port->buf_mode = B_NONE;
port->buf_fill = 0; port->buf_fill = 0;
port->buf_pos = 0; port->buf_pos = 0;
return tag_object(port);
if(fcntl(port->fd,F_SETFL,O_NONBLOCK,1) == -1)
io_error(port,__FUNCTION__);
return port;
} }
void primitive_portp(void) void primitive_portp(void)
@ -37,6 +41,16 @@ void init_buffer(PORT* port, int mode)
{ {
port->buf_fill = port->buf_pos = 0; port->buf_fill = port->buf_pos = 0;
port->buf_mode = mode; port->buf_mode = mode;
if(mode == B_READ_LINE)
port->line = tag_object(sbuf(LINE_SIZE));
}
else if(port->buf_mode == B_READ_LINE)
{
if(port->line == F)
port->line = tag_object(sbuf(LINE_SIZE));
else
untag_sbuf(port->line)->top = 0;
} }
} }
@ -45,20 +59,12 @@ void fixup_port(PORT* port)
port->fd = -1; port->fd = -1;
if(port->buffer != 0) if(port->buffer != 0)
port->buffer = fixup_untagged_string(port->buffer); port->buffer = fixup_untagged_string(port->buffer);
if(port->line != 0) fixup(&port->line);
{
port->line = (SBUF*)((CELL)port->line
+ (active->base - relocation_base));
}
} }
void collect_port(PORT* port) void collect_port(PORT* port)
{ {
if(port->buffer != 0) if(port->buffer != 0)
port->buffer = copy_untagged_string(port->buffer); port->buffer = copy_untagged_string(port->buffer);
if(port->line != 0) copy_object(&port->line);
{
port->line = (SBUF*)copy_untagged_object(
port->line,sizeof(SBUF));
}
} }

View File

@ -1,12 +1,12 @@
/* Buffer mode */ /* Buffer mode */
typedef enum { B_READ, B_WRITE, B_NONE } B_MODE; typedef enum { B_READ_LINE, B_WRITE, B_NONE } B_MODE;
typedef struct { typedef struct {
CELL header; CELL header;
FIXNUM fd; FIXNUM fd;
STRING* buffer; STRING* buffer;
/* partial line used by read_line_fd */ /* tagged partial line used by read_line_fd */
SBUF* line; CELL line;
/* one of B_READ, B_WRITE or B_NONE */ /* one of B_READ, B_WRITE or B_NONE */
B_MODE buf_mode; B_MODE buf_mode;
/* top of buffer */ /* top of buffer */
@ -16,7 +16,7 @@ typedef struct {
} PORT; } PORT;
PORT* untag_port(CELL tagged); PORT* untag_port(CELL tagged);
CELL port(CELL fd); PORT* port(CELL fd);
void init_buffer(PORT* port, int mode); void init_buffer(PORT* port, int mode);
void primitive_portp(void); void primitive_portp(void);
void fixup_port(PORT* port); void fixup_port(PORT* port);

View File

@ -120,9 +120,13 @@ XT primitives[] = {
primitive_server_socket, primitive_server_socket,
primitive_close_fd, primitive_close_fd,
primitive_accept_fd, primitive_accept_fd,
primitive_can_read_line,
primitive_add_read_line_io_task,
primitive_read_line_fd_8, primitive_read_line_fd_8,
primitive_can_write,
primitive_add_write_io_task,
primitive_write_fd_8, primitive_write_fd_8,
primitive_flush_fd, primitive_next_io_task,
primitive_room, primitive_room,
primitive_os_env, primitive_os_env,
primitive_millis, primitive_millis,

View File

@ -1,4 +1,4 @@
extern XT primitives[]; extern XT primitives[];
#define PRIMITIVE_COUNT 129 #define PRIMITIVE_COUNT 133
CELL primitive_to_xt(CELL primitive); CELL primitive_to_xt(CELL primitive);

View File

@ -11,11 +11,11 @@ int make_server_socket(CHAR port)
sock = socket(PF_INET, SOCK_STREAM, 0); sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock < 0) if(sock < 0)
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
/* Reuse port number */ /* Reuse port number */
if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(int)) < 0) if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(int)) < 0)
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
/* Give the socket a name */ /* Give the socket a name */
name.sin_family = AF_INET; name.sin_family = AF_INET;
@ -24,14 +24,14 @@ int make_server_socket(CHAR port)
if(bind(sock,(struct sockaddr *)&name, sizeof(name)) < 0) if(bind(sock,(struct sockaddr *)&name, sizeof(name)) < 0)
{ {
close(sock); close(sock);
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
} }
/* Start listening for connections */ /* Start listening for connections */
if(listen(sock,1) < 0) if(listen(sock,1) < 0)
{ {
close(sock); close(sock);
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
} }
return sock; return sock;
@ -40,7 +40,7 @@ int make_server_socket(CHAR port)
void primitive_server_socket(void) void primitive_server_socket(void)
{ {
CHAR p = (CHAR)to_fixnum(dpop()); CHAR p = (CHAR)to_fixnum(dpop());
dpush(port(make_server_socket(p))); dpush(tag_object(port(make_server_socket(p))));
} }
int accept_connection(int sock) int accept_connection(int sock)
@ -50,9 +50,7 @@ int accept_connection(int sock)
int new = accept(sock,(struct sockaddr *)&clientname,&size); int new = accept(sock,(struct sockaddr *)&clientname,&size);
if(new < 0) if(new < 0)
io_error(__FUNCTION__); io_error(NULL,__FUNCTION__);
set_nonblocking(new);
printf("Connection from host %s, port %hd.\n", printf("Connection from host %s, port %hd.\n",
inet_ntoa(clientname.sin_addr), inet_ntoa(clientname.sin_addr),
@ -64,5 +62,6 @@ int accept_connection(int sock)
void primitive_accept_fd(void) void primitive_accept_fd(void)
{ {
PORT* p = untag_port(dpop()); PORT* p = untag_port(dpop());
dpush(port(accept_connection(p->fd))); PORT* new = port(accept_connection(p->fd));
dpush(tag_object(new));
} }