write buffering in native factor

cvs
Slava Pestov 2004-08-11 20:56:48 +00:00
parent 6f2ea59ac9
commit 1e8a33ae99
12 changed files with 101 additions and 66 deletions

View File

@ -48,7 +48,6 @@
- broken pipe errors with httpd and telnetd in cfactor - broken pipe errors with httpd and telnetd in cfactor
- read# - read#
- write buffer
- to_fixnum and friends: error on float - to_fixnum and friends: error on float
ERROR: I/O error: [ "primitive_read_line_fd_8" "Resource temporarily unavailable" ] ERROR: I/O error: [ "primitive_read_line_fd_8" "Resource temporarily unavailable" ]
- errors: don't show .factor-rc - errors: don't show .factor-rc

View File

@ -211,7 +211,6 @@ IN: cross-compiler
read-line-fd-8 read-line-fd-8
write-fd-8 write-fd-8
flush-fd flush-fd
shutdown-fd
room room
os-env os-env
millis millis

View File

@ -81,7 +81,7 @@ USE: url-encoding
read [ parse-request ] when* read [ parse-request ] when*
] with-stream ] with-stream
] [ ] [
default-error-handler drop [ default-error-handler drop ] when*
] catch ; ] catch ;
: quit-flag ( -- ? ) : quit-flag ( -- ? )

View File

@ -89,20 +89,9 @@ USE: namespaces
[ "socket" get close-fd ] "fclose" set [ "socket" get close-fd ] "fclose" set
] extend ; ] extend ;
: <socket-stream> ( fd -- stream )
#! A slight variation on <fd-stream> that calls shutdown(2)
#! when closed.
dup <fd-stream> [
( -- )
[
"in" get [ dup shutdown-fd close-fd ] when*
( out == in )
] "fclose" set
] extend ;
: accept ( server -- client ) : accept ( server -- client )
#! Accept a connection from a server socket. #! Accept a connection from a server socket.
[ "socket" get ] bind accept-fd <socket-stream> ; "socket" swap get* accept-fd dup <fd-stream> ;
: init-stdio ( -- ) : init-stdio ( -- )
stdin stdout <fd-stream> <stdio-stream> "stdio" set ; stdin stdout <fd-stream> <stdio-stream> "stdio" set ;

View File

@ -78,7 +78,11 @@ USE: strings
( -- ) ( -- )
[ ] "fclose" set [ ] "fclose" set
( string -- ) ( string -- )
[ namespace fwrite "\n" namespace fwrite ] "fprint" set [
namespace fwrite
"\n" namespace fwrite
namespace fflush
] "fprint" set
] extend ; ] extend ;
: <extend-stream> ( stream -- stream ) : <extend-stream> ( stream -- stream )

View File

@ -17,4 +17,4 @@ USE: test
[ 5/3 ] [ 10 20 3 h ] unit-test [ 5/3 ] [ 10 20 3 h ] unit-test
[ 1000/3 ] [ 0 10 3 [ sq ] simpson-try ] unit-test [ 1000/3 ] [ 0 10 3 [ sq ] simpson ] unit-test

View File

@ -28,10 +28,6 @@ typedef unsigned long int CELL;
typedef unsigned short CHAR; typedef unsigned short CHAR;
#define CHARS sizeof(CHAR) #define CHARS sizeof(CHAR)
/* must always be 8 bits */
typedef unsigned char BYTE;
#define BYTES 1
/* Memory heap size */ /* Memory heap size */
#define DEFAULT_ARENA (32 * 1024 * 1024) #define DEFAULT_ARENA (32 * 1024 * 1024)
#define STACK_SIZE 1024 #define STACK_SIZE 1024

View File

@ -7,11 +7,15 @@ void init_io(void)
env.user[STDERR_ENV] = handle(HANDLE_FD,2); env.user[STDERR_ENV] = handle(HANDLE_FD,2);
} }
void primitive_close_fd(void) void init_buffer(HANDLE* h, int mode)
{ {
HANDLE* h = untag_handle(HANDLE_FD,env.dt); if(h->buf_mode == B_NONE)
close(h->object); h->buffer = tag_object(string(BUF_SIZE,'\0'));
env.dt = dpop(); if(h->buf_mode != mode)
{
h->buf_fill = h->buf_pos = 0;
h->buf_mode = mode;
}
} }
int fill_buffer(HANDLE* h, int fd, STRING* buf) int fill_buffer(HANDLE* h, int fd, STRING* buf)
@ -39,11 +43,9 @@ void primitive_read_line_fd_8(void)
/* read ascii from fd */ /* read ascii from fd */
STRING* buf; STRING* buf;
if(h->buf_mode != B_READ)
{ init_buffer(h,B_READ);
h->buf_mode = B_READ;
h->buffer = tag_object(string(BUF_SIZE,'\0'));
}
buf = untag_string(h->buffer); buf = untag_string(h->buffer);
for(;;) for(;;)
@ -86,24 +88,82 @@ void primitive_read_line_fd_8(void)
} }
} }
/* keep writing to the stream until everything is written */
void write_fully(HANDLE* h, char* str, CELL len)
{
FIXNUM amount, written = 0, remains;
for(;;)
{
remains = len - written;
if(remains == 0)
break;
amount = write(h->object,str + written,remains);
if(amount < 0)
io_error(__FUNCTION__);
written += amount;
}
}
void flush_buffer(HANDLE* h)
{
STRING* buf;
if(h->buf_mode != B_WRITE || h->buf_fill == 0)
return;
buf = untag_string(h->buffer);
write_fully(h,(char*)buf + sizeof(STRING),h->buf_fill);
h->buf_fill = 0;
}
void write_fd_char_8(HANDLE* h, FIXNUM ch) void write_fd_char_8(HANDLE* h, FIXNUM ch)
{ {
BYTE c = (BYTE)ch; char c = (char)ch;
STRING* buf;
int amount = write(h->object,&c,1); init_buffer(h,B_WRITE);
buf = untag_string(h->buffer);
if(amount < 0) /* Is the buffer full? */
io_error(__FUNCTION__); if(h->buf_fill == buf->capacity * CHARS)
flush_buffer(h);
bput((CELL)buf + sizeof(STRING) + h->buf_fill,c);
h->buf_fill++;
} }
void write_fd_string_8(HANDLE* h, STRING* str) void write_fd_string_8(HANDLE* h, STRING* str)
{ {
char* c_str = to_c_string(str); char* c_str = to_c_string(str);
STRING* buf;
int amount = write(h->object,c_str,str->capacity); init_buffer(h,B_WRITE);
buf = untag_string(h->buffer);
if(amount < 0) /* Is the string longer than the buffer? */
io_error(__FUNCTION__); if(str->capacity > buf->capacity * CHARS)
{
/* Just write it immediately */
flush_buffer(h);
write_fully(h,c_str,str->capacity);
}
else
{
/* Is there enough room in the buffer? If not, flush */
if(h->buf_fill + str->capacity > buf->capacity * CHARS)
flush_buffer(h);
/* Append string to buffer */
memcpy((void*)((CELL)buf + sizeof(STRING) + h->buf_fill),
c_str,str->capacity);
h->buf_fill += str->capacity;
}
} }
void primitive_write_fd_8(void) void primitive_write_fd_8(void)
@ -133,27 +193,15 @@ void primitive_write_fd_8(void)
void primitive_flush_fd(void) void primitive_flush_fd(void)
{ {
HANDLE* h = untag_handle(HANDLE_FD,env.dt); HANDLE* h = untag_handle(HANDLE_FD,env.dt);
flush_buffer(h);
if(h->buf_mode == B_WRITE)
{
}
/* int fd = h->object;
if(fsync(fd) < 0)
io_error(__FUNCTION__); */
env.dt = dpop(); env.dt = dpop();
} }
void primitive_shutdown_fd(void) void primitive_close_fd(void)
{ {
/* HANDLE* h = untag_handle(HANDLE_FD,env.dt); HANDLE* h = untag_handle(HANDLE_FD,env.dt);
int fd = h->object; flush_buffer(h);
close(h->object);
if(shutdown(fd,SHUT_RDWR) < 0)
io_error(__FUNCTION__); */
env.dt = dpop(); env.dt = dpop();
} }

View File

@ -1,12 +1,13 @@
#define LINE_SIZE 80 #define LINE_SIZE 80
#define BUF_SIZE 1024 #define BUF_SIZE (32 * 1024)
void init_io(void);
void primitive_close_fd(void);
int fill_buffer(HANDLE* h, int fd, STRING* buf); int fill_buffer(HANDLE* h, int fd, STRING* buf);
void flush_buffer(HANDLE* h);
void write_fully(HANDLE* h, char* str, CELL len);
void init_io(void);
void primitive_read_line_fd_8(void); void primitive_read_line_fd_8(void);
void write_fd_char_8(HANDLE* h, FIXNUM ch); void write_fd_char_8(HANDLE* h, FIXNUM ch);
void write_fd_string_8(HANDLE* h, STRING* str); void write_fd_string_8(HANDLE* h, STRING* str);
void primitive_write_fd_8(void); void primitive_write_fd_8(void);
void primitive_flush_fd(void); void primitive_flush_fd(void);
void primitive_shutdown_fd(void); void primitive_close_fd(void);

View File

@ -40,14 +40,14 @@ INLINE void cput(CELL where, CHAR what)
*((CHAR*)where) = what; *((CHAR*)where) = what;
} }
INLINE BYTE bget(CELL where) INLINE char bget(CELL where)
{ {
return *((BYTE*)where); return *((char*)where);
} }
INLINE void bput(CELL where, BYTE what) INLINE void bput(CELL where, char what)
{ {
*((BYTE*)where) = what; *((char*)where) = what;
} }
bool in_zone(ZONE* z, CELL pointer); bool in_zone(ZONE* z, CELL pointer);

View File

@ -121,7 +121,6 @@ XT primitives[] = {
primitive_read_line_fd_8, primitive_read_line_fd_8,
primitive_write_fd_8, primitive_write_fd_8,
primitive_flush_fd, primitive_flush_fd,
primitive_shutdown_fd,
primitive_room, primitive_room,
primitive_os_env, primitive_os_env,
primitive_millis, primitive_millis,

View File

@ -1,4 +1,4 @@
extern XT primitives[]; extern XT primitives[];
#define PRIMITIVE_COUNT 128 #define PRIMITIVE_COUNT 127
CELL primitive_to_xt(CELL primitive); CELL primitive_to_xt(CELL primitive);