more I/O work

cvs
Slava Pestov 2004-08-13 05:38:15 +00:00
parent 8321eadefe
commit b9ad4405b4
8 changed files with 189 additions and 29 deletions

View File

@ -14,6 +14,7 @@ int main(int argc, char** argv)
load_image(argv[1]);
init_stacks();
init_io();
run();
return 0;

View File

@ -5,7 +5,6 @@
#include <fcntl.h>
#include <limits.h>
#include <math.h>
#include <poll.h>
#include <setjmp.h>
#include <stdbool.h>
#include <stdio.h>

View File

@ -3,15 +3,40 @@
void init_io(void)
{
env.user[STDIN_ENV] = port(0);
set_nonblocking(0);
env.user[STDOUT_ENV] = port(1);
set_nonblocking(1);
env.user[STDERR_ENV] = port(2);
set_nonblocking(2);
}
int read_step(PORT* port)
{
int amount = read(port->fd,
port->buffer + 1,
port->buffer->capacity * 2);
FIXNUM amount = -1;
add_read_io_task(port,F);
for(;;)
{
amount = read(port->fd,
port->buffer + 1,
port->buffer->capacity * 2);
if(amount == -1)
{
if(errno != EAGAIN)
{
remove_read_io_task(port);
return -1;
}
}
else
break;
iomux();
}
remove_read_io_task(port);
port->buf_fill = (amount < 0 ? 0 : amount);
port->buf_pos = 0;
@ -90,8 +115,30 @@ void write_step(PORT* port)
{
char* chars = (char*)port->buffer + sizeof(STRING);
FIXNUM amount = write(port->fd,chars + port->buf_pos,
port->buf_fill - port->buf_pos);
FIXNUM amount;
add_write_io_task(port,F);
for(;;)
{
amount = write(port->fd,chars + port->buf_pos,
port->buf_fill - port->buf_pos);
if(amount == -1)
{
if(errno != EAGAIN)
{
remove_write_io_task(port);
return;
}
}
else
break;
iomux();
}
remove_write_io_task(port);
if(amount < 0)
io_error(__FUNCTION__);
@ -193,3 +240,9 @@ void primitive_close_fd(void)
flush_buffer(port);
close(port->fd);
}
void set_nonblocking(int fd)
{
if(fcntl(fd,F_SETFL,O_NONBLOCK,1) == -1)
io_error(__FUNCTION__);
}

View File

@ -15,3 +15,4 @@ void write_fd_string_8(PORT* port, STRING* str);
void primitive_write_fd_8(void);
void primitive_flush_fd(void);
void primitive_close_fd(void);
void set_nonblocking(int fd);

View File

@ -9,13 +9,13 @@ void primitive_open_file(void)
int fd;
if(read && write)
mode = O_RDWR | O_CREAT;
mode = O_RDWR | O_CREAT | O_NONBLOCK;
else if(read)
mode = O_RDONLY;
mode = O_RDONLY | O_NONBLOCK;
else if(write)
mode = O_WRONLY | O_CREAT | O_TRUNC;
mode = O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK;
else
mode = 0;
mode = O_NONBLOCK;
fd = open(path,mode);
if(fd < 0)

View File

@ -1,31 +1,111 @@
#include "factor.h"
void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks)
{
int i;
FD_ZERO(fdset);
for(i = 0; i < FD_SETSIZE; i++)
{
read_io_tasks[i].port = NULL;
read_io_tasks[i].callback = F;
}
}
void init_iomux(void)
{
io_task_count = 0;
read_fd_count = 0;
init_io_tasks(&read_fd_set,read_io_tasks);
write_fd_count = 0;
init_io_tasks(&write_fd_set,write_io_tasks);
}
void add_io_task(int fd, int events, CELL callback)
void add_io_task(
PORT* port,
CELL callback,
fd_set* fdset,
IO_TASK* io_tasks,
int* fd_count)
{
int fds = *fd_count;
/* Look for an empty slot first */
int i;
for(i = 0; i < io_task_count; i++)
for(i = 0; i < fds; i++)
{
if(io_tasks[i].fd == -1)
if(io_tasks[i].port == NULL)
{
io_tasks[i].fd = fd;
io_tasks[i].events = events;
io_callbacks[i] = callback;
FD_SET(port->fd,fdset);
io_tasks[i].port = port;
io_tasks[i].callback = callback;
return;
}
}
/* Add to the end */
if(io_task_count == MAX_IO_TASKS)
critical_error("Too many I/O tasks",io_task_count);
/* add at end */
if(fds == FD_SETSIZE)
critical_error("Too many I/O tasks",*fd_count);
io_tasks[io_task_count].fd = fd;
io_tasks[io_task_count].events = events;
io_callbacks[io_task_count] = callback;
io_task_count++;
FD_SET(port->fd,fdset);
io_tasks[fds].port = port;
io_tasks[fds].callback = callback;
*fd_count = fds + 1;
}
void add_read_io_task(PORT* port, CELL callback)
{
add_io_task(port,callback,
&read_fd_set,read_io_tasks,
&read_fd_count);
}
void add_write_io_task(PORT* port, CELL callback)
{
add_io_task(port,callback,
&write_fd_set,write_io_tasks,
&write_fd_count);
}
void remove_io_task(
PORT* port,
fd_set* fdset,
IO_TASK* io_tasks,
int* fd_count)
{
int i;
int fds = *fd_count;
for(i = 0; i < fds; i++)
{
if(io_tasks[i].port == port)
{
FD_CLR(port->fd,fdset);
io_tasks[i].port = NULL;
io_tasks[i].callback = F;
if(i == fds - 1)
*fd_count = fds - 1;
return;
}
}
}
void remove_read_io_task(PORT* port)
{
remove_io_task(port,&read_fd_set,read_io_tasks,&read_fd_count);
}
void remove_write_io_task(PORT* port)
{
remove_io_task(port,&write_fd_set,write_io_tasks,&write_fd_count);
}
/* Wait for I/O and return a callback. */
CELL iomux(void)
{
int nfds = select(read_fd_count > write_fd_count
? read_fd_count : write_fd_count,
&read_fd_set,&write_fd_set,NULL,NULL);
return F;
}

View File

@ -1,7 +1,31 @@
#define MAX_IO_TASKS 256
struct pollfd io_tasks[MAX_IO_TASKS];
CELL io_callbacks[MAX_IO_TASKS];
unsigned int io_task_count;
typedef struct {
PORT* port;
CELL callback;
} IO_TASK;
fd_set read_fd_set;
IO_TASK read_io_tasks[FD_SETSIZE];
int read_fd_count;
fd_set write_fd_set;
IO_TASK write_io_tasks[FD_SETSIZE];
int write_fd_count;
void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks);
void init_iomux(void);
void add_io_task(int fd, int events, CELL callback);
void add_io_task(
PORT* port,
CELL callback,
fd_set* fd_set,
IO_TASK* io_tasks,
int* fd_count);
void add_read_io_task(PORT* port, CELL callback);
void add_write_io_task(PORT* port, CELL callback);
void remove_io_task(
PORT* port,
fd_set* fdset,
IO_TASK* io_tasks,
int* fd_count);
void remove_read_io_task(PORT* port);
void remove_write_io_task(PORT* port);
CELL iomux(void);

View File

@ -52,6 +52,8 @@ int accept_connection(int sock)
if(new < 0)
io_error(__FUNCTION__);
set_nonblocking(new);
printf("Connection from host %s, port %hd.\n",
inet_ntoa(clientname.sin_addr),
ntohs(clientname.sin_port));