yet more I/O work

cvs
Slava Pestov 2004-08-13 06:19:22 +00:00
parent b9ad4405b4
commit 11707cbb90
5 changed files with 127 additions and 63 deletions

View File

@ -14,7 +14,7 @@ int read_step(PORT* port)
{ {
FIXNUM amount = -1; FIXNUM amount = -1;
add_read_io_task(port,F); add_io_task(IO_TASK_READ_LINE,port,F);
for(;;) for(;;)
{ {
@ -26,7 +26,7 @@ int read_step(PORT* port)
{ {
if(errno != EAGAIN) if(errno != EAGAIN)
{ {
remove_read_io_task(port); remove_io_task(IO_TASK_READ_LINE,port);
return -1; return -1;
} }
} }
@ -36,7 +36,7 @@ int read_step(PORT* port)
iomux(); iomux();
} }
remove_read_io_task(port); remove_io_task(IO_TASK_READ_LINE,port);
port->buf_fill = (amount < 0 ? 0 : amount); port->buf_fill = (amount < 0 ? 0 : amount);
port->buf_pos = 0; port->buf_pos = 0;
@ -111,39 +111,29 @@ void primitive_read_line_fd_8(void)
} }
} }
void write_step(PORT* port) /* Return true if write was done */
bool write_step(PORT* port)
{ {
char* chars = (char*)port->buffer + sizeof(STRING); char* chars = (char*)port->buffer + sizeof(STRING);
FIXNUM amount; FIXNUM amount = write(port->fd,chars + port->buf_pos,
add_write_io_task(port,F);
for(;;)
{
amount = write(port->fd,chars + port->buf_pos,
port->buf_fill - port->buf_pos); port->buf_fill - port->buf_pos);
if(amount == -1) if(amount == -1)
{ {
if(errno != EAGAIN) if(errno == EAGAIN)
return false;
else
{ {
remove_write_io_task(port); io_error(__FUNCTION__);
return; return false;
} }
} }
else else
break; {
iomux();
}
remove_write_io_task(port);
if(amount < 0)
io_error(__FUNCTION__);
port->buf_pos += amount; port->buf_pos += amount;
return true;
}
} }
/* keep writing to the stream until everything is written */ /* keep writing to the stream until everything is written */
@ -152,14 +142,19 @@ void flush_buffer(PORT* port)
if(port->buf_mode != B_WRITE || port->buf_fill == 0) if(port->buf_mode != B_WRITE || port->buf_fill == 0)
return; return;
add_io_task(IO_TASK_WRITE,port,F);
for(;;) for(;;)
{ {
if(port->buf_fill == port->buf_pos) if(port->buf_fill == port->buf_pos)
break; break;
write_step(port); if(!write_step(port))
iomux();
} }
remove_io_task(IO_TASK_WRITE,port);
port->buf_pos = 0; port->buf_pos = 0;
port->buf_fill = 0; port->buf_fill = 0;
} }

View File

@ -7,6 +7,7 @@ int read_step(PORT* port);
typedef enum { READLINE_AGAIN, READLINE_EOL, READLINE_EOF } READLINE_STAT; typedef enum { READLINE_AGAIN, READLINE_EOL, READLINE_EOF } READLINE_STAT;
READLINE_STAT read_line_step(PORT* port); READLINE_STAT read_line_step(PORT* port);
bool write_step(PORT* port);
void flush_buffer(PORT* port); void flush_buffer(PORT* port);
void init_io(void); void init_io(void);
void primitive_read_line_fd_8(void); void primitive_read_line_fd_8(void);

View File

@ -111,7 +111,7 @@ void collect_next(void)
} }
} }
void copy_roots(void) void collect_roots(void)
{ {
int i; int i;
@ -140,7 +140,8 @@ void primitive_gc(void)
{ {
flip_zones(); flip_zones();
scan = active->here = active->base; scan = active->here = active->base;
copy_roots(); collect_roots();
collect_io_tasks();
while(scan < active->here) while(scan < active->here)
{ {
gc_debug("scan loop",scan); gc_debug("scan loop",scan);

View File

@ -7,7 +7,7 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks)
FD_ZERO(fdset); FD_ZERO(fdset);
for(i = 0; i < FD_SETSIZE; i++) for(i = 0; i < FD_SETSIZE; i++)
{ {
read_io_tasks[i].port = NULL; read_io_tasks[i].port = F;
read_io_tasks[i].callback = F; read_io_tasks[i].callback = F;
} }
} }
@ -21,7 +21,8 @@ void init_iomux(void)
init_io_tasks(&write_fd_set,write_io_tasks); init_io_tasks(&write_fd_set,write_io_tasks);
} }
void add_io_task( void add_io_task_impl(
IO_TASK_TYPE type,
PORT* port, PORT* port,
CELL callback, CELL callback,
fd_set* fdset, fd_set* fdset,
@ -34,10 +35,11 @@ void add_io_task(
int i; int i;
for(i = 0; i < fds; i++) for(i = 0; i < fds; i++)
{ {
if(io_tasks[i].port == NULL) if(io_tasks[i].port == F)
{ {
FD_SET(port->fd,fdset); FD_SET(port->fd,fdset);
io_tasks[i].port = port; io_tasks[i].type = type;
io_tasks[i].port = tag_object(port);
io_tasks[i].callback = callback; io_tasks[i].callback = callback;
return; return;
} }
@ -48,26 +50,32 @@ void add_io_task(
critical_error("Too many I/O tasks",*fd_count); critical_error("Too many I/O tasks",*fd_count);
FD_SET(port->fd,fdset); FD_SET(port->fd,fdset);
io_tasks[fds].port = port; io_tasks[fds].type = type;
io_tasks[fds].port = tag_object(port);
io_tasks[fds].callback = callback; io_tasks[fds].callback = callback;
*fd_count = fds + 1; *fd_count = fds + 1;
} }
void add_read_io_task(PORT* port, CELL callback) void add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback)
{ {
add_io_task(port,callback, switch(type)
{
case IO_TASK_READ_LINE:
case IO_TASK_READ_COUNT:
add_io_task_impl(type,port,callback,
&read_fd_set,read_io_tasks, &read_fd_set,read_io_tasks,
&read_fd_count); &read_fd_count);
} break;
case IO_TASK_WRITE:
void add_write_io_task(PORT* port, CELL callback) add_io_task_impl(type,port,callback,
{
add_io_task(port,callback,
&write_fd_set,write_io_tasks, &write_fd_set,write_io_tasks,
&write_fd_count); &write_fd_count);
break;
}
} }
void remove_io_task( void remove_io_task_impl(
IO_TASK_TYPE type,
PORT* port, PORT* port,
fd_set* fdset, fd_set* fdset,
IO_TASK* io_tasks, IO_TASK* io_tasks,
@ -78,10 +86,10 @@ void remove_io_task(
for(i = 0; i < fds; i++) for(i = 0; i < fds; i++)
{ {
if(io_tasks[i].port == port) if(untag_port(io_tasks[i].port) == port)
{ {
FD_CLR(port->fd,fdset); FD_CLR(port->fd,fdset);
io_tasks[i].port = NULL; io_tasks[i].port = F;
io_tasks[i].callback = F; io_tasks[i].callback = F;
if(i == fds - 1) if(i == fds - 1)
*fd_count = fds - 1; *fd_count = fds - 1;
@ -90,14 +98,40 @@ void remove_io_task(
} }
} }
void remove_read_io_task(PORT* port) void remove_io_task(IO_TASK_TYPE type, PORT* port)
{ {
remove_io_task(port,&read_fd_set,read_io_tasks,&read_fd_count); switch(type)
{
case IO_TASK_READ_LINE:
case IO_TASK_READ_COUNT:
remove_io_task_impl(type,port,
&read_fd_set,read_io_tasks,
&read_fd_count);
break;
case IO_TASK_WRITE:
remove_io_task_impl(type,port,
&write_fd_set,write_io_tasks,
&write_fd_count);
}
} }
void remove_write_io_task(PORT* port) void perform_iotask(IO_TASK* task)
{ {
remove_io_task(port,&write_fd_set,write_io_tasks,&write_fd_count); if(task->port == F)
return;
switch(task->type)
{
case IO_TASK_READ_LINE:
break;
case IO_TASK_WRITE:
write_step(untag_port(task->port));
break;
default:
critical_error("Bad I/O task",task->type);
break;
}
} }
/* Wait for I/O and return a callback. */ /* Wait for I/O and return a callback. */
@ -107,5 +141,30 @@ CELL iomux(void)
? read_fd_count : write_fd_count, ? read_fd_count : write_fd_count,
&read_fd_set,&write_fd_set,NULL,NULL); &read_fd_set,&write_fd_set,NULL,NULL);
/* int i;
for(i = 0; i < read_fd_count; i++)
perform_iotask(&read_io_tasks[i]);
for(i = 0; i < write_fd_count; i++)
perform_iotask(&write_io_tasks[i]); */
return F; return F;
} }
void collect_io_tasks(void)
{
int i;
for(i = 0; i < FD_SETSIZE; i++)
{
copy_object(&read_io_tasks[i].port);
copy_object(&read_io_tasks[i].callback);
}
for(i = 0; i < FD_SETSIZE; i++)
{
copy_object(&write_io_tasks[i].port);
copy_object(&write_io_tasks[i].callback);
}
}

View File

@ -1,5 +1,12 @@
typedef enum {
IO_TASK_READ_LINE,
IO_TASK_READ_COUNT,
IO_TASK_WRITE
} IO_TASK_TYPE;
typedef struct { typedef struct {
PORT* port; IO_TASK_TYPE type;
CELL port;
CELL callback; CELL callback;
} IO_TASK; } IO_TASK;
@ -13,19 +20,20 @@ int write_fd_count;
void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks); void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks);
void init_iomux(void); void init_iomux(void);
void add_io_task( void add_io_task_impl(
IO_TASK_TYPE type,
PORT* port, PORT* port,
CELL callback, CELL callback,
fd_set* fd_set, fd_set* fdset,
IO_TASK* io_tasks, IO_TASK* io_tasks,
int* fd_count); int* fd_count);
void add_read_io_task(PORT* port, CELL callback); void add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback);
void add_write_io_task(PORT* port, CELL callback); void remove_io_task_impl(
void remove_io_task( IO_TASK_TYPE type,
PORT* port, PORT* port,
fd_set* fdset, fd_set* fdset,
IO_TASK* io_tasks, IO_TASK* io_tasks,
int* fd_count); int* fd_count);
void remove_read_io_task(PORT* port); void remove_io_task(IO_TASK_TYPE type, PORT* port);
void remove_write_io_task(PORT* port);
CELL iomux(void); CELL iomux(void);
void collect_io_tasks(void);