From b9ad4405b48c1b590095b0655433c52bac760e1a Mon Sep 17 00:00:00 2001 From: Slava Pestov Date: Fri, 13 Aug 2004 05:38:15 +0000 Subject: [PATCH] more I/O work --- native/factor.c | 1 + native/factor.h | 1 - native/fd.c | 63 +++++++++++++++++++++++++--- native/fd.h | 1 + native/file.c | 8 ++-- native/iomux.c | 108 +++++++++++++++++++++++++++++++++++++++++------- native/iomux.h | 34 ++++++++++++--- native/socket.c | 2 + 8 files changed, 189 insertions(+), 29 deletions(-) diff --git a/native/factor.c b/native/factor.c index fdb3e8225e..eaa9a03709 100644 --- a/native/factor.c +++ b/native/factor.c @@ -14,6 +14,7 @@ int main(int argc, char** argv) load_image(argv[1]); init_stacks(); init_io(); + run(); return 0; diff --git a/native/factor.h b/native/factor.h index c6dc715c0b..9b186f89aa 100644 --- a/native/factor.h +++ b/native/factor.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/native/fd.c b/native/fd.c index 6fea877824..45b7cbd5c6 100644 --- a/native/fd.c +++ b/native/fd.c @@ -3,15 +3,40 @@ void init_io(void) { env.user[STDIN_ENV] = port(0); + set_nonblocking(0); env.user[STDOUT_ENV] = port(1); + set_nonblocking(1); env.user[STDERR_ENV] = port(2); + set_nonblocking(2); } int read_step(PORT* port) { - int amount = read(port->fd, - port->buffer + 1, - port->buffer->capacity * 2); + FIXNUM amount = -1; + + add_read_io_task(port,F); + + for(;;) + { + amount = read(port->fd, + port->buffer + 1, + port->buffer->capacity * 2); + + if(amount == -1) + { + if(errno != EAGAIN) + { + remove_read_io_task(port); + return -1; + } + } + else + break; + + iomux(); + } + + remove_read_io_task(port); port->buf_fill = (amount < 0 ? 0 : amount); port->buf_pos = 0; @@ -90,8 +115,30 @@ void write_step(PORT* port) { char* chars = (char*)port->buffer + sizeof(STRING); - FIXNUM amount = write(port->fd,chars + port->buf_pos, - port->buf_fill - port->buf_pos); + FIXNUM amount; + + add_write_io_task(port,F); + + for(;;) + { + amount = write(port->fd,chars + port->buf_pos, + port->buf_fill - port->buf_pos); + + if(amount == -1) + { + if(errno != EAGAIN) + { + remove_write_io_task(port); + return; + } + } + else + break; + + iomux(); + } + + remove_write_io_task(port); if(amount < 0) io_error(__FUNCTION__); @@ -193,3 +240,9 @@ void primitive_close_fd(void) flush_buffer(port); close(port->fd); } + +void set_nonblocking(int fd) +{ + if(fcntl(fd,F_SETFL,O_NONBLOCK,1) == -1) + io_error(__FUNCTION__); +} diff --git a/native/fd.h b/native/fd.h index 0d05b5f6f5..71f91f3091 100644 --- a/native/fd.h +++ b/native/fd.h @@ -15,3 +15,4 @@ void write_fd_string_8(PORT* port, STRING* str); void primitive_write_fd_8(void); void primitive_flush_fd(void); void primitive_close_fd(void); +void set_nonblocking(int fd); diff --git a/native/file.c b/native/file.c index 4670ce5c6f..e48706efc1 100644 --- a/native/file.c +++ b/native/file.c @@ -9,13 +9,13 @@ void primitive_open_file(void) int fd; if(read && write) - mode = O_RDWR | O_CREAT; + mode = O_RDWR | O_CREAT | O_NONBLOCK; else if(read) - mode = O_RDONLY; + mode = O_RDONLY | O_NONBLOCK; else if(write) - mode = O_WRONLY | O_CREAT | O_TRUNC; + mode = O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK; else - mode = 0; + mode = O_NONBLOCK; fd = open(path,mode); if(fd < 0) diff --git a/native/iomux.c b/native/iomux.c index 1ce9060a9d..5d42a31ace 100644 --- a/native/iomux.c +++ b/native/iomux.c @@ -1,31 +1,111 @@ #include "factor.h" +void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks) +{ + int i; + + FD_ZERO(fdset); + for(i = 0; i < FD_SETSIZE; i++) + { + read_io_tasks[i].port = NULL; + read_io_tasks[i].callback = F; + } +} + void init_iomux(void) { - io_task_count = 0; + read_fd_count = 0; + init_io_tasks(&read_fd_set,read_io_tasks); + + write_fd_count = 0; + init_io_tasks(&write_fd_set,write_io_tasks); } -void add_io_task(int fd, int events, CELL callback) +void add_io_task( + PORT* port, + CELL callback, + fd_set* fdset, + IO_TASK* io_tasks, + int* fd_count) { + int fds = *fd_count; + /* Look for an empty slot first */ int i; - for(i = 0; i < io_task_count; i++) + for(i = 0; i < fds; i++) { - if(io_tasks[i].fd == -1) + if(io_tasks[i].port == NULL) { - io_tasks[i].fd = fd; - io_tasks[i].events = events; - io_callbacks[i] = callback; + FD_SET(port->fd,fdset); + io_tasks[i].port = port; + io_tasks[i].callback = callback; return; } } - /* Add to the end */ - if(io_task_count == MAX_IO_TASKS) - critical_error("Too many I/O tasks",io_task_count); + /* add at end */ + if(fds == FD_SETSIZE) + critical_error("Too many I/O tasks",*fd_count); - io_tasks[io_task_count].fd = fd; - io_tasks[io_task_count].events = events; - io_callbacks[io_task_count] = callback; - io_task_count++; + FD_SET(port->fd,fdset); + io_tasks[fds].port = port; + io_tasks[fds].callback = callback; + *fd_count = fds + 1; +} + +void add_read_io_task(PORT* port, CELL callback) +{ + add_io_task(port,callback, + &read_fd_set,read_io_tasks, + &read_fd_count); +} + +void add_write_io_task(PORT* port, CELL callback) +{ + add_io_task(port,callback, + &write_fd_set,write_io_tasks, + &write_fd_count); +} + +void remove_io_task( + PORT* port, + fd_set* fdset, + IO_TASK* io_tasks, + int* fd_count) +{ + int i; + int fds = *fd_count; + + for(i = 0; i < fds; i++) + { + if(io_tasks[i].port == port) + { + FD_CLR(port->fd,fdset); + io_tasks[i].port = NULL; + io_tasks[i].callback = F; + if(i == fds - 1) + *fd_count = fds - 1; + return; + } + } +} + +void remove_read_io_task(PORT* port) +{ + remove_io_task(port,&read_fd_set,read_io_tasks,&read_fd_count); +} + +void remove_write_io_task(PORT* port) +{ + remove_io_task(port,&write_fd_set,write_io_tasks,&write_fd_count); +} + +/* Wait for I/O and return a callback. */ +CELL iomux(void) +{ + int nfds = select(read_fd_count > write_fd_count + ? read_fd_count : write_fd_count, + &read_fd_set,&write_fd_set,NULL,NULL); + + return F; } diff --git a/native/iomux.h b/native/iomux.h index b912eda693..2018dcda76 100644 --- a/native/iomux.h +++ b/native/iomux.h @@ -1,7 +1,31 @@ -#define MAX_IO_TASKS 256 -struct pollfd io_tasks[MAX_IO_TASKS]; -CELL io_callbacks[MAX_IO_TASKS]; -unsigned int io_task_count; +typedef struct { + PORT* port; + CELL callback; +} IO_TASK; +fd_set read_fd_set; +IO_TASK read_io_tasks[FD_SETSIZE]; +int read_fd_count; + +fd_set write_fd_set; +IO_TASK write_io_tasks[FD_SETSIZE]; +int write_fd_count; + +void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks); void init_iomux(void); -void add_io_task(int fd, int events, CELL callback); +void add_io_task( + PORT* port, + CELL callback, + fd_set* fd_set, + IO_TASK* io_tasks, + int* fd_count); +void add_read_io_task(PORT* port, CELL callback); +void add_write_io_task(PORT* port, CELL callback); +void remove_io_task( + PORT* port, + fd_set* fdset, + IO_TASK* io_tasks, + int* fd_count); +void remove_read_io_task(PORT* port); +void remove_write_io_task(PORT* port); +CELL iomux(void); diff --git a/native/socket.c b/native/socket.c index 8e2bc9334a..cdf630c8c1 100644 --- a/native/socket.c +++ b/native/socket.c @@ -52,6 +52,8 @@ int accept_connection(int sock) if(new < 0) io_error(__FUNCTION__); + set_nonblocking(new); + printf("Connection from host %s, port %hd.\n", inet_ntoa(clientname.sin_addr), ntohs(clientname.sin_port));