diff --git a/native/fd.c b/native/fd.c index 45b7cbd5c6..3e2da54873 100644 --- a/native/fd.c +++ b/native/fd.c @@ -14,7 +14,7 @@ int read_step(PORT* port) { FIXNUM amount = -1; - add_read_io_task(port,F); + add_io_task(IO_TASK_READ_LINE,port,F); for(;;) { @@ -26,7 +26,7 @@ int read_step(PORT* port) { if(errno != EAGAIN) { - remove_read_io_task(port); + remove_io_task(IO_TASK_READ_LINE,port); return -1; } } @@ -36,7 +36,7 @@ int read_step(PORT* port) iomux(); } - remove_read_io_task(port); + remove_io_task(IO_TASK_READ_LINE,port); port->buf_fill = (amount < 0 ? 0 : amount); port->buf_pos = 0; @@ -111,39 +111,29 @@ void primitive_read_line_fd_8(void) } } -void write_step(PORT* port) +/* Return true if write was done */ +bool write_step(PORT* port) { char* chars = (char*)port->buffer + sizeof(STRING); - FIXNUM amount; + FIXNUM amount = write(port->fd,chars + port->buf_pos, + port->buf_fill - port->buf_pos); - add_write_io_task(port,F); - - for(;;) + if(amount == -1) { - amount = write(port->fd,chars + port->buf_pos, - port->buf_fill - port->buf_pos); - - if(amount == -1) - { - if(errno != EAGAIN) - { - remove_write_io_task(port); - return; - } - } + if(errno == EAGAIN) + return false; else - break; - - iomux(); + { + io_error(__FUNCTION__); + return false; + } + } + else + { + port->buf_pos += amount; + return true; } - - remove_write_io_task(port); - - if(amount < 0) - io_error(__FUNCTION__); - - port->buf_pos += amount; } /* keep writing to the stream until everything is written */ @@ -152,14 +142,19 @@ void flush_buffer(PORT* port) if(port->buf_mode != B_WRITE || port->buf_fill == 0) return; + add_io_task(IO_TASK_WRITE,port,F); + for(;;) { if(port->buf_fill == port->buf_pos) break; - write_step(port); + if(!write_step(port)) + iomux(); } + remove_io_task(IO_TASK_WRITE,port); + port->buf_pos = 0; port->buf_fill = 0; } diff --git a/native/fd.h b/native/fd.h index 71f91f3091..a396f0fa5f 100644 --- a/native/fd.h +++ b/native/fd.h @@ -7,6 +7,7 @@ int read_step(PORT* port); typedef enum { READLINE_AGAIN, READLINE_EOL, READLINE_EOF } READLINE_STAT; READLINE_STAT read_line_step(PORT* port); +bool write_step(PORT* port); void flush_buffer(PORT* port); void init_io(void); void primitive_read_line_fd_8(void); diff --git a/native/gc.c b/native/gc.c index ea652a4377..5c9a5b5a79 100644 --- a/native/gc.c +++ b/native/gc.c @@ -111,7 +111,7 @@ void collect_next(void) } } -void copy_roots(void) +void collect_roots(void) { int i; @@ -140,7 +140,8 @@ void primitive_gc(void) { flip_zones(); scan = active->here = active->base; - copy_roots(); + collect_roots(); + collect_io_tasks(); while(scan < active->here) { gc_debug("scan loop",scan); diff --git a/native/iomux.c b/native/iomux.c index 5d42a31ace..4d1a3634d8 100644 --- a/native/iomux.c +++ b/native/iomux.c @@ -7,7 +7,7 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks) FD_ZERO(fdset); for(i = 0; i < FD_SETSIZE; i++) { - read_io_tasks[i].port = NULL; + read_io_tasks[i].port = F; read_io_tasks[i].callback = F; } } @@ -21,7 +21,8 @@ void init_iomux(void) init_io_tasks(&write_fd_set,write_io_tasks); } -void add_io_task( +void add_io_task_impl( + IO_TASK_TYPE type, PORT* port, CELL callback, fd_set* fdset, @@ -34,10 +35,11 @@ void add_io_task( int i; for(i = 0; i < fds; i++) { - if(io_tasks[i].port == NULL) + if(io_tasks[i].port == F) { FD_SET(port->fd,fdset); - io_tasks[i].port = port; + io_tasks[i].type = type; + io_tasks[i].port = tag_object(port); io_tasks[i].callback = callback; return; } @@ -48,26 +50,32 @@ void add_io_task( critical_error("Too many I/O tasks",*fd_count); FD_SET(port->fd,fdset); - io_tasks[fds].port = port; + io_tasks[fds].type = type; + io_tasks[fds].port = tag_object(port); io_tasks[fds].callback = callback; *fd_count = fds + 1; } -void add_read_io_task(PORT* port, CELL callback) +void add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback) { - add_io_task(port,callback, - &read_fd_set,read_io_tasks, - &read_fd_count); + switch(type) + { + case IO_TASK_READ_LINE: + case IO_TASK_READ_COUNT: + add_io_task_impl(type,port,callback, + &read_fd_set,read_io_tasks, + &read_fd_count); + break; + case IO_TASK_WRITE: + add_io_task_impl(type,port,callback, + &write_fd_set,write_io_tasks, + &write_fd_count); + break; + } } -void add_write_io_task(PORT* port, CELL callback) -{ - add_io_task(port,callback, - &write_fd_set,write_io_tasks, - &write_fd_count); -} - -void remove_io_task( +void remove_io_task_impl( + IO_TASK_TYPE type, PORT* port, fd_set* fdset, IO_TASK* io_tasks, @@ -78,10 +86,10 @@ void remove_io_task( for(i = 0; i < fds; i++) { - if(io_tasks[i].port == port) + if(untag_port(io_tasks[i].port) == port) { FD_CLR(port->fd,fdset); - io_tasks[i].port = NULL; + io_tasks[i].port = F; io_tasks[i].callback = F; if(i == fds - 1) *fd_count = fds - 1; @@ -90,14 +98,40 @@ void remove_io_task( } } -void remove_read_io_task(PORT* port) +void remove_io_task(IO_TASK_TYPE type, PORT* port) { - remove_io_task(port,&read_fd_set,read_io_tasks,&read_fd_count); + switch(type) + { + case IO_TASK_READ_LINE: + case IO_TASK_READ_COUNT: + remove_io_task_impl(type,port, + &read_fd_set,read_io_tasks, + &read_fd_count); + break; + case IO_TASK_WRITE: + remove_io_task_impl(type,port, + &write_fd_set,write_io_tasks, + &write_fd_count); + } } -void remove_write_io_task(PORT* port) +void perform_iotask(IO_TASK* task) { - remove_io_task(port,&write_fd_set,write_io_tasks,&write_fd_count); + if(task->port == F) + return; + + switch(task->type) + { + case IO_TASK_READ_LINE: + + break; + case IO_TASK_WRITE: + write_step(untag_port(task->port)); + break; + default: + critical_error("Bad I/O task",task->type); + break; + } } /* Wait for I/O and return a callback. */ @@ -107,5 +141,30 @@ CELL iomux(void) ? read_fd_count : write_fd_count, &read_fd_set,&write_fd_set,NULL,NULL); + /* int i; + + for(i = 0; i < read_fd_count; i++) + perform_iotask(&read_io_tasks[i]); + + for(i = 0; i < write_fd_count; i++) + perform_iotask(&write_io_tasks[i]); */ + return F; } + +void collect_io_tasks(void) +{ + int i; + + for(i = 0; i < FD_SETSIZE; i++) + { + copy_object(&read_io_tasks[i].port); + copy_object(&read_io_tasks[i].callback); + } + + for(i = 0; i < FD_SETSIZE; i++) + { + copy_object(&write_io_tasks[i].port); + copy_object(&write_io_tasks[i].callback); + } +} diff --git a/native/iomux.h b/native/iomux.h index 2018dcda76..411eb2b6bf 100644 --- a/native/iomux.h +++ b/native/iomux.h @@ -1,5 +1,12 @@ +typedef enum { + IO_TASK_READ_LINE, + IO_TASK_READ_COUNT, + IO_TASK_WRITE +} IO_TASK_TYPE; + typedef struct { - PORT* port; + IO_TASK_TYPE type; + CELL port; CELL callback; } IO_TASK; @@ -13,19 +20,20 @@ int write_fd_count; void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks); void init_iomux(void); -void add_io_task( +void add_io_task_impl( + IO_TASK_TYPE type, PORT* port, CELL callback, - fd_set* fd_set, + fd_set* fdset, IO_TASK* io_tasks, int* fd_count); -void add_read_io_task(PORT* port, CELL callback); -void add_write_io_task(PORT* port, CELL callback); -void remove_io_task( +void add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback); +void remove_io_task_impl( + IO_TASK_TYPE type, PORT* port, fd_set* fdset, IO_TASK* io_tasks, int* fd_count); -void remove_read_io_task(PORT* port); -void remove_write_io_task(PORT* port); +void remove_io_task(IO_TASK_TYPE type, PORT* port); CELL iomux(void); +void collect_io_tasks(void);