diff --git a/library/win32/win32-io-internals.factor b/library/win32/win32-io-internals.factor index e5938df814..4cb286b0ed 100644 --- a/library/win32/win32-io-internals.factor +++ b/library/win32/win32-io-internals.factor @@ -25,12 +25,15 @@ IN: win32-io-internals USING: alien errors kernel kernel-internals lists math namespaces threads - vectors win32-api io generic io-internals sequences ; + vectors win32-api io generic io-internals sequences prettyprint ; SYMBOL: completion-port SYMBOL: io-queue -SYMBOL: free-list -SYMBOL: callbacks + +TUPLE: io-queue free-list callbacks ; +TUPLE: io-callback overlapped quotation stream ; + +GENERIC: expire : expected-error? ( -- bool ) [ @@ -73,41 +76,31 @@ BEGIN-STRUCT: indirect-pointer FIELD: int value END-STRUCT -: num-callbacks ( -- len ) - #! Returns the length of the callback vector. - io-queue get [ callbacks get length ] bind ; +: ( -- overlapped ) + "overlapped-ext" c-size malloc ; -: set-callback-quot ( quot index -- ) - io-queue get [ - dup >r callbacks get nth car swap cons - r> callbacks get set-nth - ] bind ; +C: io-queue ( -- queue ) + 0 over set-io-queue-callbacks ; -: new-overlapped ( -- index ) - #! Allocates and returns a new entry for the io queue. - #! The new index in the callback vector is returned. - io-queue get [ - "overlapped-ext" c-type [ "width" get ] bind malloc - dup num-callbacks swap - set-overlapped-ext-user-data - unit num-callbacks dup >r callbacks get set-nth r> - ] bind ; +C: io-callback ( -- callback ) + io-queue get io-queue-callbacks [ push ] 2keep + length 1 - [ set-overlapped-ext-user-data ] keep + swap [ set-io-callback-overlapped ] keep ; -: alloc-io-task ( quot -- overlapped ) - io-queue get [ - free-list get [ - uncons free-list set - ] [ new-overlapped ] ifte* - [ set-callback-quot ] keep - callbacks get nth car - ] bind ; +: alloc-io-callback ( quot stream -- overlapped ) + io-queue get io-queue-free-list [ + uncons io-queue get [ set-io-queue-free-list ] keep + io-queue-callbacks nth + ] [ ] ifte* + [ set-io-callback-stream ] keep + [ set-io-callback-quotation ] keep + io-callback-overlapped ; : get-io-callback ( index -- callback ) - #! Returns and frees the io queue entry at index. - io-queue get [ - dup free-list [ cons ] change - callbacks get nth cdr - ] bind ; + dup io-queue get io-queue-callbacks nth swap + io-queue get [ io-queue-free-list cons ] keep set-io-queue-free-list + [ f swap set-io-callback-stream ] keep + io-callback-quotation ; : (wait-for-io) ( timeout -- error overlapped len ) >r completion-port get @@ -123,6 +116,10 @@ END-STRUCT overlapped-ext-user-data get-io-callback ] ifte ; +: cancel-timedout ( -- ) + io-queue get + io-queue-callbacks [ io-callback-stream [ expire ] when* ] each ; + : wait-for-io ( timeout -- callback len ) (wait-for-io) overlapped>callback swap indirect-pointer-value rot [ queue-error ] unless ; @@ -131,7 +128,7 @@ END-STRUCT INFINITE wait-for-io swap call ; : win32-io-thread ( -- ) - 10 wait-for-io swap [ + cancel-timedout 10 wait-for-io swap [ [ schedule-thread call ] callcc0 2drop ] [ drop yield @@ -141,11 +138,6 @@ END-STRUCT : win32-init-stdio ( -- ) INVALID_HANDLE_VALUE NULL NULL 1 CreateIoCompletionPort completion-port set - - [ - 32 callbacks set - f free-list set - ] extend io-queue set - + io-queue set [ win32-io-thread ] in-thread ; diff --git a/library/win32/win32-io.factor b/library/win32/win32-io.factor index ecebf9d054..cc049a87bd 100644 --- a/library/win32/win32-io.factor +++ b/library/win32/win32-io.factor @@ -118,4 +118,7 @@ END-STRUCT : CloseHandle ( handle -- ? ) "bool" "kernel32" "CloseHandle" [ "void*" ] alien-invoke ; - + +: CancelIo ( handle -- ) + "bool" "kernel32" "CancelIo" [ "void*" ] alien-invoke drop ; + diff --git a/library/win32/win32-server.factor b/library/win32/win32-server.factor index 83fb9ca671..11ea78993a 100644 --- a/library/win32/win32-server.factor +++ b/library/win32/win32-server.factor @@ -83,21 +83,31 @@ C: win32-server ( port -- server ) maybe-init-winsock new-socket swap over bind-socket dup listen-socket dup add-completion socket set + dup stream set ] extend over set-win32-server-this ; M: win32-server stream-close ( server -- ) win32-server-this [ socket get CloseHandle drop ] bind ; +M: win32-server set-timeout ( timeout server -- ) + win32-server-this [ timeout set ] bind ; + +M: win32-server expire ( -- ) + win32-server-this [ + timeout get [ millis cutoff get > [ socket get CancelIo ] when ] when + ] bind ; + IN: io : accept ( server -- client ) win32-server-this [ - new-socket 64 + update-timeout new-socket 64 [ - alloc-io-task init-overlapped >r >r >r socket get r> r> + stream get alloc-io-callback init-overlapped + >r >r >r socket get r> r> buffer-ptr 0 32 32 NULL r> AcceptEx [ handle-socket-error ] unless stop ] callcc1 pending-error drop - swap dup add-completion dupd - swap buffer-free + swap dup add-completion + dupd swap buffer-free ] bind ; diff --git a/library/win32/win32-stream.factor b/library/win32/win32-stream.factor index cb5a47546e..80def2d88f 100644 --- a/library/win32/win32-stream.factor +++ b/library/win32/win32-stream.factor @@ -37,6 +37,9 @@ SYMBOL: in-buffer SYMBOL: out-buffer SYMBOL: fileptr SYMBOL: file-size +SYMBOL: stream +SYMBOL: timeout +SYMBOL: cutoff : pending-error ( len/status -- len/status ) dup [ win32-throw-error ] unless ; @@ -51,9 +54,12 @@ SYMBOL: file-size : update-file-pointer ( whence -- ) file-size get [ fileptr [ + ] change ] [ drop ] ifte ; +: update-timeout ( -- ) + timeout get [ millis + cutoff set ] when* ; + : flush-output ( -- ) - [ - alloc-io-task init-overlapped >r + update-timeout [ + stream get alloc-io-callback init-overlapped >r handle get out-buffer get [ buffer@ ] keep buffer-length NULL r> WriteFile [ handle-io-error ] unless stop ] callcc1 pending-error @@ -79,8 +85,8 @@ M: string do-write ( str -- ) ] ifte ; : fill-input ( -- ) - [ - alloc-io-task init-overlapped >r + update-timeout [ + stream get alloc-io-callback init-overlapped >r handle get in-buffer get [ buffer@ ] keep buffer-capacity file-size get [ fileptr get - min ] when* NULL r> @@ -113,34 +119,21 @@ M: string do-write ( str -- ) : peek-input ( -- str ) 1 in-buffer get buffer-first-n ; -: do-read-line ( sbuf -- str ) - 1 consume-input dup length 0 = [ drop >string-or-f ] [ - dup "\r" = [ - peek-input "\n" = [ 1 consume-input drop ] when - drop >string - ] [ - dup "\n" = [ - peek-input "\r" = [ 1 consume-input drop ] when - drop >string - ] [ - dupd nappend do-read-line - ] ifte - ] ifte - ] ifte ; - -M: win32-stream stream-write-attr ( str style stream -- ) +M: win32-stream stream-format ( str style stream -- ) win32-stream-this nip [ do-write ] bind ; -M: win32-stream stream-readln ( stream -- str ) - win32-stream-this [ 80 do-read-line ] bind ; - M: win32-stream stream-read ( count stream -- str ) win32-stream-this [ dup swap do-read-count ] bind ; +M: win32-stream stream-read1 ( stream -- str ) + win32-stream-this [ + 1 consume-input dup length 0 = [ drop f ] when first + ] bind ; + M: win32-stream stream-flush ( stream -- ) win32-stream-this [ maybe-flush-output ] bind ; -M: win32-stream stream-auto-flush ( stream -- ) +M: win32-stream stream-finish ( stream -- ) drop ; M: win32-stream stream-close ( stream -- ) @@ -154,6 +147,14 @@ M: win32-stream stream-close ( stream -- ) M: win32-stream win32-stream-handle ( stream -- handle ) win32-stream-this [ handle get ] bind ; +M: win32-stream set-timeout ( timeout stream -- ) + win32-stream-this [ timeout set ] bind ; + +M: win32-stream expire ( stream -- ) + win32-stream-this [ + timeout get [ millis cutoff get > [ handle get CancelIo ] when ] when + ] bind ; + C: win32-stream ( handle -- stream ) swap [ dup NULL GetFileSize dup -1 = not [ @@ -163,10 +164,11 @@ C: win32-stream ( handle -- stream ) 4096 in-buffer set 4096 out-buffer set 0 fileptr set + dup stream set ] extend over set-win32-stream-this ; : ( path -- stream ) - t f win32-open-file ; + t f win32-open-file ; : ( path -- stream ) f t win32-open-file ;