threads: simplify 'suspend' combinator

release
Slava Pestov 2010-03-27 12:03:06 -04:00
parent 031ea6c39c
commit f1e19aabdb
12 changed files with 64 additions and 68 deletions

View File

@ -11,7 +11,6 @@ IN: alarms.tests
] unit-test
[ ] [
[
[ resume ] curry instant later drop
] "test" suspend drop
self [ resume ] curry instant later drop
"test" suspend drop
] unit-test

View File

@ -17,7 +17,7 @@ GENERIC: from ( channel -- value )
<PRIVATE
: wait ( channel -- )
[ senders>> push ] curry
[ self ] dip senders>> push
"channel send" suspend drop ;
: (to) ( value receivers -- )
@ -36,7 +36,7 @@ M: channel to ( value channel -- )
[ dup wait to ] [ nip (to) ] if-empty ;
M: channel from ( channel -- value )
[
[ self ] dip
notify senders>>
[ (from) ] unless-empty
] curry "channel receive" suspend ;
"channel receive" suspend ;

View File

@ -1,4 +1,4 @@
! Copyright (C) 2008 Slava Pestov.
! Copyright (C) 2008, 2010 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: deques threads kernel arrays sequences alarms fry ;
IN: concurrency.conditions
@ -22,10 +22,13 @@ IN: concurrency.conditions
ERROR: wait-timeout ;
: queue ( queue -- )
[ self ] dip push-front ;
: wait ( queue timeout status -- )
over [
[ queue-timeout [ drop ] ] dip suspend
[ queue-timeout ] dip suspend
[ wait-timeout ] [ cancel-alarm ] if
] [
[ drop '[ _ push-front ] ] dip suspend drop
[ drop queue ] dip suspend drop
] if ;

View File

@ -1,4 +1,4 @@
! Copyright (C) 2008 Slava Pestov.
! Copyright (C) 2008, 2010 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: kernel threads boxes accessors fry ;
IN: concurrency.exchangers
@ -17,5 +17,6 @@ TUPLE: exchanger thread object ;
[ thread>> box> resume-with ] dip
] [
[ object>> >box ] keep
'[ _ thread>> >box ] "exchange" suspend
[ self ] dip thread>> >box
"exchange" suspend
] if ;

View File

@ -1,4 +1,4 @@
! Copyright (C) 2008, 2009 Slava Pestov.
! Copyright (C) 2008, 2010 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: kernel sequences math fry ;
IN: deques
@ -16,22 +16,22 @@ GENERIC: node-value ( node -- value )
GENERIC: deque-empty? ( deque -- ? )
: push-front ( obj deque -- )
push-front* drop ;
push-front* drop ; inline
: push-all-front ( seq deque -- )
[ push-front ] curry each ;
: push-back ( obj deque -- )
push-back* drop ;
push-back* drop ; inline
: push-all-back ( seq deque -- )
[ push-back ] curry each ;
: pop-front ( deque -- obj )
[ peek-front ] [ pop-front* ] bi ;
[ peek-front ] [ pop-front* ] bi ; inline
: pop-back ( deque -- obj )
[ peek-back ] [ pop-back* ] bi ;
[ peek-back ] [ pop-back* ] bi ; inline
: slurp-deque ( deque quot -- )
[ drop '[ _ deque-empty? not ] ]

View File

@ -67,12 +67,11 @@ M: io-timeout summary drop "I/O operation timed out" ;
: wait-for-fd ( handle event -- )
dup +retry+ eq? [ 2drop ] [
'[
swap handle-fd mx get-global _ {
[ [ self ] dip handle-fd mx get-global ] dip {
{ +input+ [ add-input-callback ] }
{ +output+ [ add-output-callback ] }
} case
] "I/O" suspend nip [ io-timeout ] when
"I/O" suspend [ io-timeout ] when
] if ;
: wait-for-port ( port event -- )

View File

@ -40,8 +40,8 @@ M: winnt add-completion ( win32-handle -- )
: twiddle-thumbs ( overlapped port -- bytes-transferred )
[
drop
[ >c-ptr pending-overlapped get-global set-at ] curry "I/O" suspend
{
[ self ] dip >c-ptr pending-overlapped get-global set-at
"I/O" suspend {
{ [ dup integer? ] [ ] }
{ [ dup array? ] [
first dup eof?

View File

@ -129,12 +129,8 @@ M: process-was-killed error.
: (wait-for-process) ( process -- status )
dup handle>>
[
dup [ processes get at push ] curry
"process" suspend drop
] when
dup killed>>
[ process-was-killed ] [ status>> ] if ;
[ self over processes get at push "process" suspend drop ] when
dup killed>> [ process-was-killed ] [ status>> ] if ;
: wait-for-process ( process -- status )
[ (wait-for-process) ] with-timeout ;

View File

@ -142,10 +142,8 @@ HELP: interrupt
{ $description "Interrupts a sleeping thread." } ;
HELP: suspend
{ $values { "quot" { $quotation "( thread -- )" } } { "state" string } { "obj" object } }
{ $description "Suspends the current thread and passes it to the quotation."
$nl
"After the quotation returns, control yields to the next runnable thread and the current thread does not execute again until it is resumed, and so the quotation must arrange for another thread to later resume the suspended thread with a call to " { $link resume } " or " { $link resume-with } "."
{ $values { "state" string } { "obj" object } }
{ $description "Suspends the current thread. Control yields to the next runnable thread and the current thread does not execute again until it is resumed, and so the caller of this word must arrange for another thread to later resume the suspended thread with a call to " { $link resume } " or " { $link resume-with } "."
$nl
"The status string is for debugging purposes; see " { $link "tools.threads" } "." } ;

View File

@ -13,9 +13,7 @@ yield
[ ] [ 0.3 sleep ] unit-test
[ "hey" sleep ] must-fail
[ 3 ] [
[ 3 swap resume-with ] "Test suspend" suspend
] unit-test
[ 3 ] [ 3 self resume-with "Test suspend" suspend ] unit-test
[ f ] [ f get-global ] unit-test
@ -29,8 +27,6 @@ yield
] parallel-map
] unit-test
[ [ 3 throw ] "A" suspend ] [ 3 = ] must-fail-with
:: spawn-namespace-test ( -- ? )
<promise> :> p gensym :> g
[

View File

@ -1,4 +1,4 @@
! Copyright (C) 2004, 2009 Slava Pestov.
! Copyright (C) 2004, 2010 Slava Pestov.
! Copyright (C) 2005 Mackenzie Straight.
! See http://factorcode.org/license.txt for BSD license.
USING: arrays hashtables heaps kernel kernel.private math
@ -12,8 +12,8 @@ IN: threads
! (set-context) and (start-context) are sub-primitives, but
! we don't want them inlined into callers since their behavior
! depends on what frames are on the callstack
: start-context ( obj quot: ( obj -- * ) -- ) (start-context) ;
: set-context ( context -- ) (set-context) ;
: set-context ( obj context -- obj' ) (set-context) ;
: start-context ( obj quot: ( obj -- * ) -- obj' ) (start-context) ;
PRIVATE>
@ -24,14 +24,15 @@ TUPLE: thread
{ quot callable initial: [ ] }
{ exit-handler callable initial: [ ] }
{ id integer }
continuation
{ continuation box }
state
runnable
mailbox
variables
{ variables hashtable }
sleep-entry ;
: self ( -- thread ) 63 special-object ; inline
: self ( -- thread )
63 special-object { thread } declare ; inline
! Thread-local storage
: tnamespace ( -- assoc )
@ -46,9 +47,11 @@ sleep-entry ;
: tchange ( key quot -- )
tnamespace swap change-at ; inline
: threads ( -- assoc ) 64 special-object ;
: threads ( -- assoc )
64 special-object { hashtable } declare ; inline
: thread ( id -- thread ) threads at ;
: thread ( id -- thread )
threads at ;
: thread-registered? ( thread -- ? )
id>> threads key? ;
@ -85,9 +88,11 @@ PRIVATE>
: <thread> ( quot name -- thread )
\ thread new-thread ;
: run-queue ( -- dlist ) 65 special-object ;
: run-queue ( -- dlist )
65 special-object { dlist } declare ; inline
: sleep-queue ( -- heap ) 66 special-object ;
: sleep-queue ( -- heap )
66 special-object { dlist } declare ; inline
: resume ( thread -- )
f >>state
@ -175,25 +180,22 @@ DEFER: next
PRIVATE>
: stop ( -- )
: stop ( -- * )
self [ exit-handler>> call( -- ) ] [ unregister-thread ] bi next ;
: suspend ( quot state -- obj )
[
[ [ self swap call ] dip self (>>state) ] dip
self continuation>> >box
next
] callcc1 2nip ; inline
: suspend ( state -- obj )
self (>>state)
[ self continuation>> >box next ] callcc1 ; inline
: yield ( -- ) [ resume ] f suspend drop ;
: yield ( -- ) self resume f suspend drop ;
GENERIC: sleep-until ( n/f -- )
M: integer sleep-until
'[ _ schedule-sleep ] "sleep" suspend drop ;
[ self ] dip schedule-sleep "sleep" suspend drop ;
M: f sleep-until
drop [ drop ] "interrupt" suspend drop ;
drop "interrupt" suspend drop ;
GENERIC: sleep ( dt -- )
@ -218,7 +220,7 @@ M: real sleep
: in-thread ( quot -- )
[ datastack ] dip
'[ _ set-datastack _ call ]
'[ _ set-datastack @ ]
"Thread" spawn drop ;
GENERIC: error-in-thread ( error thread -- )

View File

@ -1,10 +1,11 @@
! Copyright (C) 2009 Slava Pestov.
! Copyright (C) 2009, 2010 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: threads kernel namespaces continuations combinators
sequences math namespaces.private continuations.private
concurrency.messaging quotations kernel.private words
sequences.private assocs models models.arrow arrays accessors
generic generic.single definitions make sbufs tools.crossref fry ;
USING: threads threads.private kernel namespaces continuations
combinators sequences math namespaces.private
continuations.private concurrency.messaging quotations
kernel.private words sequences.private assocs models
models.arrow arrays accessors generic generic.single definitions
make sbufs tools.crossref fry ;
IN: tools.continuations
<PRIVATE
@ -126,6 +127,7 @@ PRIVATE>
>n ndrop >c c>
continue continue-with
stop suspend (spawn)
set-context start-context
} [ don't-step-into ] each
\ break [ break ] "step-into" set-word-prop