Redo timeouts

db4
Slava Pestov 2008-02-09 21:34:42 -06:00
parent 9d1977eeee
commit c8f042aef4
20 changed files with 129 additions and 98 deletions

View File

@ -4,7 +4,8 @@ math.parser math.private namespaces namespaces.private parser
sequences strings vectors words quotations effects tools.test
continuations generic.standard sorting assocs definitions
prettyprint io inspector tuples classes.union classes.predicate
debugger threads.private io.streams.string combinators.private ;
debugger threads.private io.streams.string io.timeouts
combinators.private ;
IN: temporary
{ 0 2 } [ 2 "Hello" ] must-infer-as

View File

@ -21,9 +21,7 @@ $nl
{ $subsection make-span-stream }
{ $subsection make-block-stream }
{ $subsection make-cell-stream }
{ $subsection stream-write-table }
"Optional word for network streams:"
{ $subsection set-timeout } ;
{ $subsection stream-write-table } ;
ARTICLE: "stdio" "The default stream"
"Various words take an implicit stream parameter from a variable to reduce stack shuffling."
@ -73,11 +71,6 @@ ARTICLE: "streams" "Streams"
ABOUT: "streams"
HELP: set-timeout
{ $values { "n" "an integer" } { "stream" "a stream" } }
{ $contract "Sets a timeout, in milliseconds, for input and output operations on the stream. If a read or a write is initiated and no activity is seen before the timeout expires, an error will be thrown to the caller of the operation being performed." }
{ $notes "Whether or not the stream is closed when the error is thrown is implementation-specific, and user code should take care to close the stream on all error conditions in any case." } ;
HELP: stream-readln
{ $values { "stream" "an input stream" } { "str" string } }
{ $contract "Reads a line of input from the stream. Outputs " { $link f } " on stream exhaustion." }

View File

@ -4,7 +4,6 @@ USING: hashtables generic kernel math namespaces sequences strings
continuations assocs io.styles sbufs ;
IN: io
GENERIC: set-timeout ( n stream -- )
GENERIC: stream-readln ( stream -- str )
GENERIC: stream-read1 ( stream -- ch/f )
GENERIC: stream-read ( n stream -- str/f )

View File

@ -74,8 +74,3 @@ M: duplex-stream dispose
[ dup duplex-stream-out dispose ]
[ dup duplex-stream-in dispose ] [ ] cleanup
] unless drop ;
M: duplex-stream set-timeout
2dup
duplex-stream-in set-timeout
duplex-stream-out set-timeout ;

View File

@ -18,7 +18,7 @@ PROTOCOL: stream-protocol
stream-read1 stream-read stream-read-until
stream-flush stream-write1 stream-write stream-format
stream-nl make-span-stream make-block-stream stream-readln
make-cell-stream stream-write-table set-timeout ;
make-cell-stream stream-write-table ;
PROTOCOL: definition-protocol
where set-where forget uses redefined*

View File

@ -1,8 +1,8 @@
! Copyright (C) 2005, 2007 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: assocs http kernel math math.parser namespaces sequences
io io.sockets io.streams.string io.files strings splitting
continuations assocs.lib ;
io io.sockets io.streams.string io.files io.timeouts strings
splitting continuations assocs.lib ;
IN: http.client
: parse-host ( url -- host port )

View File

@ -1,6 +1,6 @@
! Copyright (C) 2003, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: assocs kernel namespaces io strings splitting
USING: assocs kernel namespaces io io.timeouts strings splitting
threads http http.server.responders sequences prettyprint
io.server logging ;

View File

@ -1,8 +1,8 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: io io.backend system kernel namespaces strings hashtables
sequences assocs combinators vocabs.loader init threads
continuations math ;
USING: io io.backend io.timeouts system kernel namespaces
strings hashtables sequences assocs combinators vocabs.loader
init threads continuations math ;
IN: io.launcher
! Non-blocking process exit notification facility
@ -10,14 +10,14 @@ SYMBOL: processes
[ H{ } clone processes set-global ] "io.launcher" add-init-hook
TUPLE: process handle status ;
TUPLE: process handle status killed? lapse ;
HOOK: register-process io-backend ( process -- )
M: object register-process drop ;
: <process> ( handle -- process )
f process construct-boa
f f <lapse> process construct-boa
V{ } clone over processes get set-at
dup register-process ;
@ -25,6 +25,8 @@ M: process equal? 2drop f ;
M: process hashcode* process-handle hashcode* ;
: process-running? ( process -- ? ) process-status not ;
SYMBOL: +command+
SYMBOL: +arguments+
SYMBOL: +detached+
@ -34,6 +36,7 @@ SYMBOL: +stdin+
SYMBOL: +stdout+
SYMBOL: +stderr+
SYMBOL: +closed+
SYMBOL: +timeout+
SYMBOL: +prepend-environment+
SYMBOL: +replace-environment+
@ -72,13 +75,17 @@ M: assoc >descriptor >hashtable ;
HOOK: run-process* io-backend ( desc -- handle )
: wait-for-process ( process -- status )
dup process-handle [
dup [ processes get at push stop ] curry callcc0
] when process-status ;
[
dup process-handle
[ dup [ processes get at push stop ] curry callcc0 ] when
dup process-killed?
[ "Process was killed" throw ] [ process-status ] if
] with-timeout ;
: run-process ( desc -- process )
>descriptor
dup run-process*
+timeout+ pick at [ over set-timeout ] when*
+detached+ rot at [ dup wait-for-process drop ] unless ;
: run-detached ( desc -- process )
@ -96,8 +103,11 @@ TUPLE: process-failed code ;
HOOK: kill-process* io-backend ( handle -- )
: kill-process ( process -- )
t over set-process-killed?
process-handle [ kill-process* ] when* ;
M: process timed-out kill-process ;
HOOK: process-stream* io-backend ( desc -- stream process )
TUPLE: process-stream process ;

View File

@ -38,8 +38,6 @@ $nl
{ $list
{ { $link port-handle } " - a native handle identifying the underlying native resource used by the port" }
{ { $link port-error } " - the most recent I/O error, if any. This error is thrown to the waiting thread when " { $link pending-error } " is called by stream operations" }
{ { $link port-timeout } " - a timeout, specifying the maximum length of time, in milliseconds, for which input operations can block before throwing an error. A value of 0 denotes no timeout is desired." }
{ { $link port-cutoff } " - the time when the current timeout expires; if no input data arrives before this time, an error is thrown" }
{ { $link port-type } " - a symbol identifying the port's intended purpose" }
{ { $link port-eof? } " - a flag indicating if the port has reached the end of file while reading" }
} } ;

View File

@ -1,10 +1,10 @@
! Copyright (C) 2005, 2008 Slava Pestov, Doug Coleman
! See http://factorcode.org/license.txt for BSD license.
IN: io.nonblocking
USING: math kernel io sequences io.buffers generic sbufs system
io.streams.lines io.streams.plain io.streams.duplex io.backend
continuations debugger classes byte-arrays namespaces splitting
dlists assocs ;
USING: math kernel io sequences io.buffers io.timeouts generic
sbufs system io.streams.lines io.streams.plain io.streams.duplex
io.backend continuations debugger classes byte-arrays namespaces
splitting dlists assocs ;
SYMBOL: default-buffer-size
64 1024 * default-buffer-size set-global
@ -13,9 +13,12 @@ SYMBOL: default-buffer-size
TUPLE: port
handle
error
timeout-entry timeout cutoff
lapse
type eof? ;
! Ports support the lapse protocol
M: port lapse port-lapse ;
SYMBOL: closed
PREDICATE: port input-port port-type input-port eq? ;
@ -26,12 +29,11 @@ GENERIC: close-handle ( handle -- )
: <port> ( handle buffer type -- port )
pick init-handle
0 0 {
<lapse> {
set-port-handle
set-delegate
set-port-type
set-port-timeout
set-port-cutoff
set-port-lapse
} port construct ;
: <buffered-port> ( handle type -- port )
@ -48,50 +50,14 @@ GENERIC: close-handle ( handle -- )
[ >r <reader> r> <duplex-stream> ] [ ] [ dispose ]
cleanup ;
: timeout? ( port -- ? )
port-cutoff dup zero? not swap millis < and ;
: pending-error ( port -- )
dup port-error f rot set-port-error [ throw ] when* ;
SYMBOL: timeout-queue
timeout-queue global [ [ <dlist> ] unless* ] change-at
: unqueue-timeout ( port -- )
port-timeout-entry [
timeout-queue get-global swap delete-node
] when* ;
: queue-timeout ( port -- )
dup timeout-queue get-global push-front*
swap set-port-timeout-entry ;
HOOK: cancel-io io-backend ( port -- )
M: object cancel-io drop ;
: expire-timeouts ( -- )
timeout-queue get-global dup dlist-empty? [ drop ] [
dup peek-back timeout?
[ pop-back cancel-io expire-timeouts ] [ drop ] if
] if ;
: begin-timeout ( port -- )
dup port-timeout dup zero? [
2drop
] [
millis + over set-port-cutoff
dup unqueue-timeout queue-timeout
] if ;
: end-timeout ( port -- )
unqueue-timeout ;
: with-port-timeout ( port quot -- )
over begin-timeout keep end-timeout ; inline
M: port set-timeout set-port-timeout ;
M: port timed-out cancel-io ;
GENERIC: (wait-to-read) ( port -- )

View File

@ -1,12 +1,12 @@
! Copyright (C) 2007 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
IN: io.streams.null
USING: kernel io continuations ;
USING: kernel io io.timeouts continuations ;
TUPLE: null-stream ;
M: null-stream dispose drop ;
M: null-stream set-timeout 2drop ;
M: null-stream set-timeout drop ;
M: null-stream stream-readln drop f ;
M: null-stream stream-read1 drop f ;
M: null-stream stream-read-until 2drop f f ;

View File

@ -0,0 +1,67 @@
! Copyright (C) 2008 Slava Pestov, Doug Coleman
! See http://factorcode.org/license.txt for BSD license.
USING: kernel math system dlists namespaces assocs init threads
io.streams.duplex ;
IN: io.timeouts
TUPLE: lapse entry timeout cutoff ;
: <lapse> f 0 0 \ lapse construct-boa ;
GENERIC: lapse ( obj -- lapse )
GENERIC: set-timeout ( ms obj -- )
M: object set-timeout lapse set-lapse-timeout ;
M: duplex-stream set-timeout
2dup
duplex-stream-in set-timeout
duplex-stream-out set-timeout ;
: timeout ( obj -- ms ) lapse lapse-timeout ;
: entry ( obj -- dlist-node ) lapse lapse-entry ;
: set-entry ( dlist-node -- obj ) lapse set-lapse-entry ;
: cutoff ( obj -- ms ) lapse lapse-cutoff ;
: set-cutoff ( ms obj -- ) lapse set-lapse-cutoff ;
SYMBOL: timeout-queue
: timeout? ( lapse -- ? )
cutoff dup zero? not swap millis < and ;
timeout-queue global [ [ <dlist> ] unless* ] change-at
: unqueue-timeout ( obj -- )
entry [
timeout-queue get-global swap delete-node
] when* ;
: queue-timeout ( obj -- )
dup timeout-queue get-global push-front*
swap set-entry ;
GENERIC: timed-out ( obj -- )
M: object timed-out drop ;
: expire-timeouts ( -- )
timeout-queue get-global dup dlist-empty? [ drop ] [
dup peek-back timeout?
[ pop-back timed-out expire-timeouts ] [ drop ] if
] if ;
: begin-timeout ( obj -- )
dup timeout dup zero? [
2drop
] [
millis + over set-cutoff
dup unqueue-timeout queue-timeout
] if ;
: with-timeout ( obj quot -- )
over begin-timeout keep unqueue-timeout ; inline
: expiry-thread ( -- )
expire-timeouts 5000 sleep expire-timeouts ;
[ expiry-thread ] "io.timeouts" add-init-hook

View File

@ -61,7 +61,7 @@ M: mx register-io-task ( task mx -- )
mx get-global register-io-task stop ;
: with-port-continuation ( port quot -- port )
[ callcc0 ] curry with-port-timeout ; inline
[ callcc0 ] curry with-timeout ; inline
M: mx unregister-io-task ( task mx -- )
fd/container delete-at drop ;
@ -178,7 +178,7 @@ M: port port-flush ( port -- )
dup buffer-empty? [ drop ] [ (wait-to-write) ] if ;
M: unix-io io-multiplex ( ms -- )
expire-timeouts mx get-global wait-for-events ;
mx get-global wait-for-events ;
M: unix-io init-stdio ( -- )
0 1 handle>duplex-stream io:stdio set-global

View File

@ -55,7 +55,7 @@ M: windows-ce-io accept ( server -- client )
] keep
] keep server-port-addr parse-sockaddr swap
<win32-socket> dup handle>duplex-stream <client-stream>
] with-port-timeout ;
] with-timeout ;
M: windows-ce-io <datagram> ( addrspec -- datagram )
[

View File

@ -91,7 +91,7 @@ M: windows-nt-io cancel-io
port-handle win32-file-handle CancelIo drop ;
M: windows-nt-io io-multiplex ( ms -- )
expire-timeouts drain-overlapped ;
drain-overlapped ;
M: windows-nt-io init-io ( -- )
<master-completion-port> master-completion-port set-global

View File

@ -1,8 +1,8 @@
USING: continuations destructors io.buffers io.files io.backend
io.nonblocking io.windows io.windows.nt.backend kernel libc math
threads windows windows.kernel32 alien.c-types alien.arrays
sequences combinators combinators.lib sequences.lib ascii
splitting alien strings ;
io.timeouts io.nonblocking io.windows io.windows.nt.backend
kernel libc math threads windows windows.kernel32 alien.c-types
alien.arrays sequences combinators combinators.lib sequences.lib
ascii splitting alien strings ;
IN: io.windows.nt.files
M: windows-nt-io cwd
@ -98,7 +98,7 @@ M: windows-nt-io FileArgs-overlapped ( port -- overlapped )
] if ;
: flush-output ( port -- )
[ [ (flush-output) ] with-port-timeout ] with-destructors ;
[ [ (flush-output) ] with-timeout ] with-destructors ;
M: port port-flush
dup buffer-empty? [ dup flush-output ] unless drop ;
@ -122,4 +122,4 @@ M: port port-flush
] [ 2drop ] if ;
M: input-port (wait-to-read) ( port -- )
[ [ ((wait-to-read)) ] with-port-timeout ] with-destructors ;
[ [ ((wait-to-read)) ] with-timeout ] with-destructors ;

View File

@ -3,8 +3,9 @@
USING: alien.c-types destructors io.windows
io.windows.nt.backend kernel math windows windows.kernel32
windows.types libc assocs alien namespaces continuations
io.monitors io.monitors.private io.nonblocking io.buffers io.files
io sequences hashtables sorting arrays combinators ;
io.monitors io.monitors.private io.nonblocking io.buffers
io.files io.timeouts io sequences hashtables sorting arrays
combinators ;
IN: io.windows.nt.monitors
: open-directory ( path -- handle )
@ -52,7 +53,7 @@ M: windows-nt-io <monitor> ( path recursive? -- monitor )
swap [ save-callback ] 2keep
dup check-monitor ! we may have closed it...
get-overlapped-result
] with-port-timeout
] with-timeout
] with-destructors ;
: parse-action ( action -- changed )

View File

@ -1,6 +1,6 @@
USING: alien alien.accessors alien.c-types byte-arrays
continuations destructors io.nonblocking io io.sockets
io.sockets.impl namespaces io.streams.duplex io.windows
continuations destructors io.nonblocking io.timeouts io.sockets
io.sockets.impl io namespaces io.streams.duplex io.windows
io.windows.nt.backend windows.winsock kernel libc math sequences
threads tuples.lib ;
IN: io.windows.nt.sockets
@ -139,7 +139,7 @@ M: windows-nt-io accept ( server -- client )
AcceptEx-args-port pending-error
dup duplex-stream-in pending-error
dup duplex-stream-out pending-error
] with-port-timeout
] with-timeout
] with-destructors ;
M: windows-nt-io <server> ( addrspec -- server )

4
extra/smtp/server/server.factor Normal file → Executable file
View File

@ -27,8 +27,8 @@
! bye
! Connection closed by foreign host.
USING: combinators kernel prettyprint io io.server sequences
namespaces io.sockets continuations ;
USING: combinators kernel prettyprint io io.timeouts io.server
sequences namespaces io.sockets continuations ;
SYMBOL: data-mode

View File

@ -1,8 +1,9 @@
! Copyright (C) 2007, 2008 Elie CHAFTARI, Dirk Vleugels, Slava Pestov.
! Copyright (C) 2007, 2008 Elie CHAFTARI, Dirk Vleugels,
! Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: namespaces io kernel logging io.sockets sequences
combinators sequences.lib splitting assocs strings math.parser
random system calendar ;
USING: namespaces io io.timeouts kernel logging io.sockets
sequences combinators sequences.lib splitting assocs strings
math.parser random system calendar ;
IN: smtp