single-tasking callback i/o works

cvs
Slava Pestov 2004-08-17 00:42:30 +00:00
parent 6165c935d3
commit 70ea45ab84
9 changed files with 80 additions and 38 deletions

View File

@ -1,5 +1,5 @@
CC = gcc34 CC = gcc34
CFLAGS = -Os -march=pentium4 -Wall -Wno-long-long CFLAGS = -Os -march=pentium4 -Wall -Wno-long-long -fomit-frame-pointer
LIBS = -lm LIBS = -lm
STRIP = strip STRIP = strip

View File

@ -1,8 +1,8 @@
0.62: 0.62:
- read-line: handle \r\n - gc call in the middle of some ops might affect callstack
- ignore errors from flush - sending ^C on socket
- can-read-line? - postpone errors until actual read/write op
- vector-each/map examples - vector-each/map examples
- unparse examples - unparse examples
- finish second practical - finish second practical
@ -83,7 +83,7 @@
+ httpd: + httpd:
- httpd: don't flush so much - if user clicks stop in browser, doesn't stop sending?
- log with date - log with date
- log user agent - log user agent
- add a socket timeout - add a socket timeout

View File

@ -38,7 +38,9 @@ USE: strings
: stderr 2 getenv ; : stderr 2 getenv ;
: flush-fd ( port -- ) : flush-fd ( port -- )
[ swap add-write-io-task next-io-task drop ( call ) ] callcc0 ; [
swap add-write-io-task next-io-task call
] callcc0 drop ;
: wait-to-write ( len port -- ) : wait-to-write ( len port -- )
tuck can-write? [ drop ] [ flush-fd ] ifte ; tuck can-write? [ drop ] [ flush-fd ] ifte ;
@ -49,7 +51,9 @@ USE: strings
over wait-to-write write-fd-8 ; over wait-to-write write-fd-8 ;
: fill-fd ( port -- ) : fill-fd ( port -- )
[ swap add-read-line-io-task next-io-task drop ( call ) ] callcc0 ; [
swap add-read-line-io-task next-io-task call
] callcc0 drop ;
: wait-to-read-line ( port -- ) : wait-to-read-line ( port -- )
dup can-read-line? [ drop ] [ fill-fd ] ifte ; dup can-read-line? [ drop ] [ fill-fd ] ifte ;
@ -58,8 +62,9 @@ USE: strings
dup wait-to-read-line read-line-fd-8 dup [ sbuf>str ] when ; dup wait-to-read-line read-line-fd-8 dup [ sbuf>str ] when ;
: wait-to-accept ( socket -- ) : wait-to-accept ( socket -- )
[ swap add-accept-io-task next-io-task drop ( call ) ] callcc0 ; [
swap add-accept-io-task next-io-task call
] callcc0 drop ;
: blocking-accept ( socket -- host port in out ) : blocking-accept ( socket -- host port in out )
dup wait-to-accept accept-fd ; dup wait-to-accept accept-fd ;

View File

@ -8,7 +8,13 @@ void init_io(void)
bool can_read_line(PORT* port) bool can_read_line(PORT* port)
{ {
return false; if(port->line_ready)
return true;
else
{
read_line_step(port);
return port->line_ready;
}
} }
void primitive_can_read_line(void) void primitive_can_read_line(void)
@ -43,32 +49,61 @@ bool read_line_step(PORT* port)
int i; int i;
char ch; char ch;
SBUF* line = untag_sbuf(port->line); SBUF* line;
if(port->line == F)
{
line = sbuf(LINE_SIZE);
port->line = tag_object(line);
}
else
{
line = untag_sbuf(port->line);
line->top = 0;
}
for(i = port->buf_pos; i < port->buf_fill; i++) for(i = port->buf_pos; i < port->buf_fill; i++)
{ {
ch = bget((CELL)port->buffer + sizeof(STRING) + i); ch = bget((CELL)port->buffer + sizeof(STRING) + i);
if(ch == '\r')
{
if(i != port->buf_fill - 1)
{
ch = bget((CELL)port->buffer
+ sizeof(STRING) + i + 1);
if(ch == '\n')
i++;
}
}
if(ch == '\n') if(ch == '\n')
{ {
port->buf_pos = i + 1; port->buf_pos = i + 1;
port->line_ready = true;
return true; return true;
} }
else else
set_sbuf_nth(line,line->top,ch); set_sbuf_nth(line,line->top,ch);
} }
port->buf_pos = port->buf_fill;
/* We've reached the end of the above loop, without seeing a newline /* We've reached the end of the above loop, without seeing a newline
or EOF, so read again */ or EOF, so read again */
port->line_ready = false;
return false; return false;
} }
void primitive_read_line_fd_8(void) void primitive_read_line_fd_8(void)
{ {
PORT* port = untag_port(dpeek()); PORT* port = untag_port(dpeek());
drepl(port->line); if(port->line_ready)
port->line = F; {
drepl(port->line);
port->line = F;
port->line_ready = false;
}
else
io_error(port,__FUNCTION__);
} }

View File

@ -111,11 +111,6 @@ bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
bool perform_read_line_io_task(PORT* port) bool perform_read_line_io_task(PORT* port)
{ {
if(port->line == F)
port->line = tag_object(sbuf(LINE_SIZE));
else
untag_sbuf(port->line)->top = 0;
if(port->buf_pos >= port->buf_fill) if(port->buf_pos >= port->buf_fill)
{ {
if(!read_step(port)) if(!read_step(port))
@ -126,6 +121,7 @@ bool perform_read_line_io_task(PORT* port)
{ {
/* EOF */ /* EOF */
port->line = F; port->line = F;
port->line_ready = true;
return true; return true;
} }
else else

View File

@ -24,7 +24,7 @@ ZONE* zalloc(CELL size)
ZONE* z = (ZONE*)malloc(sizeof(ZONE)); ZONE* z = (ZONE*)malloc(sizeof(ZONE));
if(z == 0) if(z == 0)
fatal_error("Cannot allocate zone header",size); fatal_error("Cannot allocate zone header",size);
z->base = z->here = (CELL)malloc(size); z->base = z->here = align8((CELL)malloc(size));
if(z->base == 0) if(z->base == 0)
fatal_error("Cannot allocate zone",size); fatal_error("Cannot allocate zone",size);
z->limit = z->base + size; z->limit = z->base + size;
@ -40,28 +40,23 @@ void init_arena(CELL size)
active = z1; active = z1;
} }
void* allot(CELL a) void check_memory(void)
{ {
CELL h = active->here; if(active->here > active->alarm)
active->here = align8(active->here + a); {
if(active->here > active->limit)
{
printf("Out of memory\n");
printf("active->base = %ld\n",active->base);
printf("active->here = %ld\n",active->here);
printf("active->limit = %ld\n",active->limit);
exit(1);
}
if(active->here > active->limit)
{
printf("Out of memory\n");
printf("active->base = %ld\n",active->base);
printf("active->here = %ld\n",active->here);
printf("active->limit = %ld\n",active->limit);
printf("request = %ld\n",a);
exit(1);
}
else if(active->here > active->alarm)
{
/* Execute the 'garbage-collection' word */ /* Execute the 'garbage-collection' word */
cpush(env.cf); cpush(env.cf);
env.cf = env.user[GC_ENV]; env.cf = env.user[GC_ENV];
} }
return (void*)h;
} }
void flip_zones() void flip_zones()

View File

@ -15,13 +15,21 @@ ZONE* zalloc(CELL size);
void init_arena(CELL size); void init_arena(CELL size);
void flip_zones(); void flip_zones();
void* allot(CELL a); void check_memory(void);
INLINE CELL align8(CELL a) INLINE CELL align8(CELL a)
{ {
return ((a & 7) == 0) ? a : ((a + 8) & ~7); return ((a & 7) == 0) ? a : ((a + 8) & ~7);
} }
INLINE void* allot(CELL a)
{
CELL h = active->here;
active->here += align8(a);
check_memory();
return (void*)h;
}
INLINE CELL get(CELL where) INLINE CELL get(CELL where)
{ {
return *((CELL*)where); return *((CELL*)where);

View File

@ -22,6 +22,7 @@ PORT* port(PORT_MODE type, CELL fd)
port->client_port = F; port->client_port = F;
port->client_socket = F; port->client_socket = F;
port->line = F; port->line = F;
port->line_ready = false;
port->buf_fill = 0; port->buf_fill = 0;
port->buf_pos = 0; port->buf_pos = 0;

View File

@ -8,6 +8,8 @@ typedef struct {
STRING* buffer; STRING* buffer;
/* tagged partial line used by read_line_fd */ /* tagged partial line used by read_line_fd */
CELL line; CELL line;
/* is it ready to be returned? */
bool line_ready;
/* tagged client info used by accept_fd */ /* tagged client info used by accept_fd */
CELL client_host; CELL client_host;
CELL client_port; CELL client_port;