Tested and documented all new concurrency features
parent
5f23beffe0
commit
d657821f3e
|
@ -6,6 +6,7 @@ IN: threads
|
|||
ARTICLE: "threads-start/stop" "Starting and stopping threads"
|
||||
"Spawning new threads:"
|
||||
{ $subsection spawn }
|
||||
{ $subsection spawn-server }
|
||||
"Creating and spawning a thread can be factored out into two separate steps:"
|
||||
{ $subsection <thread> }
|
||||
{ $subsection (spawn) }
|
||||
|
@ -42,7 +43,9 @@ ARTICLE: "thread-impl" "Thread implementation"
|
|||
{ $subsection sleep-queue } ;
|
||||
|
||||
ARTICLE: "threads" "Lightweight co-operative threads"
|
||||
"Factor supports lightweight co-operative threads implemented on top of continuations. A thread will yield while waiting for I/O operations to complete, or when a yield has been explicitly requested."
|
||||
"Factor supports lightweight co-operative threads implemented on top of continuations. A thread will yield while waiting for input/output operations to complete, or when a yield has been explicitly requested."
|
||||
$nl
|
||||
"Factor threads are very lightweight. Each thread can take as little as 900 bytes of memory. This library has been tested running hundreds of thousands of simple threads."
|
||||
$nl
|
||||
"Words for working with threads are in the " { $vocab-link "threads" } " vocabulary."
|
||||
{ $subsection "threads-start/stop" }
|
||||
|
@ -112,11 +115,19 @@ HELP: spawn
|
|||
{ $values { "quot" quotation } { "name" string } }
|
||||
{ $description "Spawns a new thread. The thread begins executing the given quotation; the name is for debugging purposes. The new thread begins running immediately and the current thread is added to the end of the run queue."
|
||||
$nl
|
||||
"The new thread begins with an empty data stack, an empty catch stack and a name stack containing the global namespace only. This means that the only way to pass data to the new thread is to explicitly construct a quotation containing the data, for example using " { $link curry } " or " { $link compose } "." }
|
||||
"The new thread begins with an empty data stack, an empty catch stack, and a name stack containing the global namespace only. This means that the only way to pass data to the new thread is to explicitly construct a quotation containing the data, for example using " { $link curry } " or " { $link compose } "." }
|
||||
{ $examples
|
||||
{ $code "1 2 [ + . ] 2curry \"Addition thread\" spawn" }
|
||||
} ;
|
||||
|
||||
HELP: spawn-server
|
||||
{ $values { "quot" "a quotation with stack effect " { $snippet "( -- ? )" } } { "name" string } }
|
||||
{ $description "Convenience wrapper around " { $link spawn } " which repeatedly calls the quotation in a new thread until it outputs " { $link f } "." }
|
||||
{ $examples
|
||||
"A thread that runs forever:"
|
||||
{ $code "[ do-foo-bar t ] \"Foo bar server\" spawn-server" }
|
||||
} ;
|
||||
|
||||
HELP: init-threads
|
||||
{ $description "Called during startup to initialize the threading system. This word should never be called directly." } ;
|
||||
|
||||
|
|
|
@ -17,19 +17,24 @@ mailbox variables ;
|
|||
: self ( -- thread ) 40 getenv ; inline
|
||||
|
||||
! Thread-local storage
|
||||
: tnamespace ( -- assoc ) self thread-variables ;
|
||||
: tnamespace ( -- assoc )
|
||||
self dup thread-variables
|
||||
[ ] [ H{ } clone dup rot set-thread-variables ] ?if ;
|
||||
|
||||
: tget ( key -- value ) tnamespace at ;
|
||||
: tget ( key -- value )
|
||||
self thread-variables at ;
|
||||
|
||||
: tset ( value key -- ) tnamespace set-at ;
|
||||
: tset ( value key -- )
|
||||
tnamespace set-at ;
|
||||
|
||||
: tchange ( key quot -- ) tnamespace change-at ; inline
|
||||
: tchange ( key quot -- )
|
||||
tnamespace change-at ; inline
|
||||
|
||||
SYMBOL: threads
|
||||
: threads 41 getenv ;
|
||||
|
||||
threads global [ H{ } assoc-like ] change-at
|
||||
|
||||
: thread ( id -- thread ) threads get-global at ;
|
||||
: thread ( id -- thread ) threads at ;
|
||||
|
||||
<PRIVATE
|
||||
|
||||
|
@ -44,46 +49,46 @@ threads global [ H{ } assoc-like ] change-at
|
|||
: register-thread ( thread -- )
|
||||
check-unregistered
|
||||
t over set-thread-registered?
|
||||
dup thread-id threads get-global set-at ;
|
||||
dup thread-id threads set-at ;
|
||||
|
||||
: unregister-thread ( thread -- )
|
||||
check-registered
|
||||
f over set-thread-registered?
|
||||
thread-id threads get-global delete-at ;
|
||||
thread-id threads delete-at ;
|
||||
|
||||
: set-self ( thread -- ) 40 setenv ; inline
|
||||
|
||||
PRIVATE>
|
||||
|
||||
: <thread> ( quot name error-handler -- thread )
|
||||
\ thread counter H{ } clone {
|
||||
\ thread counter {
|
||||
set-thread-quot
|
||||
set-thread-name
|
||||
set-thread-error-handler
|
||||
set-thread-id
|
||||
set-thread-variables
|
||||
} \ thread construct ;
|
||||
|
||||
SYMBOL: run-queue
|
||||
SYMBOL: sleep-queue
|
||||
: run-queue 42 getenv ;
|
||||
|
||||
: sleep-queue 43 getenv ;
|
||||
|
||||
: resume ( thread -- )
|
||||
check-registered run-queue get-global push-front ;
|
||||
check-registered run-queue push-front ;
|
||||
|
||||
: resume-with ( obj thread -- )
|
||||
check-registered 2array run-queue get-global push-front ;
|
||||
check-registered 2array run-queue push-front ;
|
||||
|
||||
<PRIVATE
|
||||
|
||||
: schedule-sleep ( thread ms -- )
|
||||
>r check-registered r> sleep-queue get-global heap-push ;
|
||||
>r check-registered r> sleep-queue heap-push ;
|
||||
|
||||
: wake-up? ( heap -- ? )
|
||||
dup heap-empty?
|
||||
[ drop f ] [ heap-peek nip millis <= ] if ;
|
||||
|
||||
: wake-up ( -- )
|
||||
sleep-queue get-global
|
||||
sleep-queue
|
||||
[ dup wake-up? ] [ dup heap-pop drop resume ] [ ] while
|
||||
drop ;
|
||||
|
||||
|
@ -92,7 +97,7 @@ SYMBOL: sleep-queue
|
|||
continue
|
||||
] [
|
||||
wake-up
|
||||
run-queue get-global pop-back
|
||||
run-queue pop-back
|
||||
dup array? [ first2 ] [ f swap ] if dup set-self
|
||||
dup thread-continuation
|
||||
f rot set-thread-continuation
|
||||
|
@ -103,9 +108,9 @@ PRIVATE>
|
|||
|
||||
: sleep-time ( -- ms )
|
||||
{
|
||||
{ [ run-queue get-global dlist-empty? not ] [ 0 ] }
|
||||
{ [ sleep-queue get-global heap-empty? ] [ f ] }
|
||||
{ [ t ] [ sleep-queue get-global heap-peek nip millis [-] ] }
|
||||
{ [ run-queue dlist-empty? not ] [ 0 ] }
|
||||
{ [ sleep-queue heap-empty? ] [ f ] }
|
||||
{ [ t ] [ sleep-queue heap-peek nip millis [-] ] }
|
||||
} cond ;
|
||||
|
||||
: stop ( -- )
|
||||
|
@ -160,9 +165,9 @@ PRIVATE>
|
|||
<PRIVATE
|
||||
|
||||
: init-threads ( -- )
|
||||
<dlist> run-queue set-global
|
||||
<min-heap> sleep-queue set-global
|
||||
H{ } clone threads set-global
|
||||
H{ } clone 41 setenv
|
||||
<dlist> 42 setenv
|
||||
<min-heap> 43 setenv
|
||||
initial-thread global
|
||||
[ drop f "Initial" [ die ] <thread> ] cache
|
||||
f over set-thread-continuation
|
||||
|
|
|
@ -17,7 +17,7 @@ IN: bunny.model
|
|||
} cond (parse-model)
|
||||
] when* ;
|
||||
|
||||
: parse-model ( stream -- vs is )
|
||||
: parse-model ( -- vs is )
|
||||
100000 <vector> 100000 <vector> (parse-model) ;
|
||||
|
||||
: n ( vs triple -- n )
|
||||
|
|
|
@ -28,7 +28,7 @@ IN: channels.examples
|
|||
[let | p [ c from ]
|
||||
newc [ <channel> ] |
|
||||
p prime to
|
||||
[ newc p c filter ] spawn drop
|
||||
[ newc p c filter ] "Filter" spawn drop
|
||||
prime newc (sieve)
|
||||
] ;
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ SYMBOL: no-channel
|
|||
receive [
|
||||
{
|
||||
{ { to ?id ?value }
|
||||
[ ?value ?id get-channel [ to f ] [ no-channel ] if* ] }
|
||||
[ ?value ?id get-channel dup [ to f ] [ 2drop no-channel ] if ] }
|
||||
{ { from ?id }
|
||||
[ ?id get-channel [ from ] [ no-channel ] if* ] }
|
||||
} match-cond
|
||||
|
@ -43,7 +43,7 @@ PRIVATE>
|
|||
: start-channel-node ( -- )
|
||||
"remote-channels" get-process [
|
||||
"remote-channels"
|
||||
[ channel-process ] "Remote channels" spawn-server
|
||||
[ channel-process t ] "Remote channels" spawn-server
|
||||
register-process
|
||||
] unless ;
|
||||
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
USING: help.markup help.syntax sequences ;
|
||||
IN: concurrency.combinators
|
||||
|
||||
HELP: parallel-map
|
||||
{ $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- newelt )" } } { "newseq" sequence } }
|
||||
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the results at the end." }
|
||||
{ $errors "Throws an error if one of the iterations throws an error." } ;
|
||||
|
||||
HELP: parallel-each
|
||||
{ $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- )" } } }
|
||||
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", blocking until all quotations complete." }
|
||||
{ $errors "Throws an error if one of the iterations throws an error." } ;
|
||||
|
||||
HELP: parallel-subset
|
||||
{ $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- ? )" } } { "newseq" sequence } }
|
||||
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the elements for which the quotation yielded a true value." }
|
||||
{ $errors "Throws an error if one of the iterations throws an error." } ;
|
||||
|
||||
ARTICLE: "concurrency.combinators" "Concurrent combinators"
|
||||
"The " { $vocab-link "concurrency.combinators" } " vocabulary provides concurrent variants of " { $link each } ", " { $link map } " and " { $link subset } ":"
|
||||
{ $subsection parallel-each }
|
||||
{ $subsection parallel-map }
|
||||
{ $subsection parallel-subset } ;
|
||||
|
||||
ABOUT: "concurrency.combinators"
|
|
@ -0,0 +1,24 @@
|
|||
IN: temporary
|
||||
USING: concurrency.combinators tools.test random kernel math
|
||||
concurrency.messaging threads sequences ;
|
||||
|
||||
[ [ drop ] parallel-each ] must-infer
|
||||
[ [ ] parallel-map ] must-infer
|
||||
[ [ ] parallel-subset ] must-infer
|
||||
|
||||
[ { 1 4 9 } ] [ { 1 2 3 } [ sq ] parallel-map ] unit-test
|
||||
|
||||
[ { 1 4 9 } ] [ { 1 2 3 } [ 1000 random sleep sq ] parallel-map ] unit-test
|
||||
|
||||
[ { 1 2 3 } [ dup 2 mod 0 = [ "Even" throw ] when ] parallel-map ]
|
||||
[ linked-error "Even" = ] must-fail-with
|
||||
|
||||
[ V{ 0 3 6 9 } ]
|
||||
[ 10 [ 3 mod zero? ] parallel-subset ] unit-test
|
||||
|
||||
[ 10 ]
|
||||
[
|
||||
V{ } clone
|
||||
10 over [ push ] curry parallel-each
|
||||
length
|
||||
] unit-test
|
|
@ -9,5 +9,9 @@ IN: concurrency.combinators
|
|||
inline
|
||||
|
||||
: parallel-each ( seq quot -- )
|
||||
"Parallel each" pick length <count-down>
|
||||
[ [ spawn-stage ] 2curry each ] keep await ; inline
|
||||
over length <count-down>
|
||||
[ [ >r curry r> spawn-stage ] 2curry each ] keep await ;
|
||||
inline
|
||||
|
||||
: parallel-subset ( seq quot -- newseq )
|
||||
over >r pusher >r each r> r> like ; inline
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Parallel sequence operations
|
|
@ -1,13 +1,13 @@
|
|||
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
|
||||
! Copyright (C) 2008 Slava Pestov.
|
||||
! See http://factorcode.org/license.txt for BSD license.
|
||||
USING: dlists threads kernel arrays sequences ;
|
||||
IN: concurrency.conditions
|
||||
|
||||
: notify-1 ( dlist -- )
|
||||
dup dlist-empty? [ pop-back resume ] [ drop ] if ;
|
||||
dup dlist-empty? [ drop ] [ pop-back second resume ] if ;
|
||||
|
||||
: notify-all ( dlist -- )
|
||||
[ second resume ] dlist-slurp yield ;
|
||||
|
||||
: wait ( queue timeout -- queue timeout )
|
||||
: wait ( queue timeout -- )
|
||||
[ 2array swap push-front ] suspend 3drop ; inline
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Low-level wait/notify support
|
|
@ -0,0 +1,24 @@
|
|||
USING: help.markup help.syntax sequences ;
|
||||
IN: concurrency.count-downs
|
||||
|
||||
HELP: <count-down>
|
||||
{ $values { "n" "a non-negative integer" } { "count-down" count-down } }
|
||||
{ $description "Creates a new count-down latch." }
|
||||
{ $errors "Throws an error if the count is lower than zero." } ;
|
||||
|
||||
HELP: count-down
|
||||
{ $values { "count-down" count-down } }
|
||||
{ $description "Decrements a count-down latch. If it reaches zero, all threads blocking on " { $link await } " are notified." }
|
||||
{ $errors "Throws an error if an attempt is made to decrement the count lower than zero." } ;
|
||||
|
||||
HELP: await
|
||||
{ $values { "count-down" count-down } }
|
||||
{ $description "Waits until the count-down value reaches zero." } ;
|
||||
|
||||
ARTICLE: "concurrency.count-downs" "Count-down latches"
|
||||
"The " { $vocab-link "concurrency.count-downs" } " vocabulary implements the " { $emphasis "count-down latch" } " data type, whichis a wrapper for a non-negative integer value which tends towards zero. A thread can either decrement the value, or wait for it to become zero."
|
||||
{ $subsection <count-down> }
|
||||
{ $subsection count-down }
|
||||
{ $subsection await } ;
|
||||
|
||||
ABOUT: "concurrency.count-downs"
|
|
@ -0,0 +1,16 @@
|
|||
USING: concurrency.count-downs threads kernel tools.test ;
|
||||
IN: temporary`
|
||||
|
||||
[ ] [ 0 <count-down> await ] unit-test
|
||||
|
||||
[ 1 <count-down> dup count-down count-down ] must-fail
|
||||
|
||||
[ ] [
|
||||
1 <count-down>
|
||||
3 <count-down>
|
||||
2dup [ await count-down ] 2curry "Master" spawn drop
|
||||
dup [ count-down ] curry "Slave" spawn drop
|
||||
dup [ count-down ] curry "Slave" spawn drop
|
||||
dup [ count-down ] curry "Slave" spawn drop
|
||||
drop await
|
||||
] unit-test
|
|
@ -8,25 +8,32 @@ IN: concurrency.count-downs
|
|||
|
||||
TUPLE: count-down n promise ;
|
||||
|
||||
: count-down-check ( count-down -- )
|
||||
dup count-down-n zero? [
|
||||
t swap count-down-promise fulfill
|
||||
] [ drop ] if ;
|
||||
|
||||
: <count-down> ( n -- count-down )
|
||||
<dlist> count-down construct-boa ;
|
||||
dup 0 < [ "Invalid count for count down" throw ] when
|
||||
<promise> \ count-down construct-boa
|
||||
dup count-down-check ;
|
||||
|
||||
: count-down ( count-down -- )
|
||||
dup count-down-n dup zero? [
|
||||
"Count down already done" throw
|
||||
] [
|
||||
1- dup pick set-count-down-n
|
||||
zero? [
|
||||
t swap count-down-promise fulfill
|
||||
] [ drop ] if
|
||||
1- over set-count-down-n
|
||||
count-down-check
|
||||
] if ;
|
||||
|
||||
: await-timeout ( count-down timeout -- )
|
||||
>r count-down-promise r> ?promise-timeout drop ;
|
||||
|
||||
: spawn-stage ( quot name count-down -- )
|
||||
count-down-promise
|
||||
promise-mailbox spawn-linked-to drop ;
|
||||
|
||||
: await ( count-down -- )
|
||||
f await-timeout ;
|
||||
|
||||
: spawn-stage ( quot count-down -- )
|
||||
[ [ count-down ] curry compose ] keep
|
||||
"Count down stage"
|
||||
swap count-down-promise
|
||||
promise-mailbox spawn-linked-to drop ;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Count-down latches
|
|
@ -1,7 +1,20 @@
|
|||
USING: help.markup help.syntax concurrency.messaging ;
|
||||
USING: help.markup help.syntax concurrency.messaging threads ;
|
||||
IN: concurrency.distributed
|
||||
|
||||
HELP: local-node
|
||||
{ $values { "addrspec" "an address specifier" }
|
||||
}
|
||||
{ $description "Return the node the current thread is running on." } ;
|
||||
|
||||
HELP: start-node
|
||||
{ $values { "port" "a port number between 0 and 65535" } }
|
||||
{ $description "Starts a node server for receiving messages from remote Factor instances." } ;
|
||||
|
||||
ARTICLE: "concurrency.distributed" "Distributed message passing"
|
||||
"The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing."
|
||||
{ $subsection start-node }
|
||||
"Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:"
|
||||
{ $subsection remote-process }
|
||||
"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." ;
|
||||
|
||||
ABOUT: "concurrency.distributed"
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
USING: help.markup help.syntax sequences kernel ;
|
||||
IN: concurrency.exchangers
|
||||
|
||||
HELP: exchanger
|
||||
{ $class-description "The class of object exchange points." } ;
|
||||
|
||||
HELP: <exchanger>
|
||||
{ $values { "exchanger" exchanger } }
|
||||
{ $description "Creates a new object exchange point." } ;
|
||||
|
||||
HELP: exchange
|
||||
{ $values { "obj" object } { "exchanger" exchanger } { "newobj" object } }
|
||||
{ $description "Waits for another thread to call " { $link exchange } " on the same exchanger. The thread's call to " { $link exchange } " returns with " { $snippet "obj" } " on the stack, and the object passed to " { $link exchange } " by the other thread is left on the current's thread stack as " { $snippet "newobj" } "." } ;
|
||||
|
||||
ARTICLE: "concurrency.exchangers" "Object exchange points"
|
||||
"The " { $vocab-link "concurrency.exchangers" } " vocabulary implements " { $emphasis "object exchange points" } ", which are rendezvous points where two threads can exchange objects."
|
||||
{ $subsection exchanger }
|
||||
{ $subsection <exchanger> }
|
||||
{ $subsection exchange }
|
||||
"One use-case is two threads, where one thread reads data into a buffer and another thread processes the data. The reader thread can begin by reading the data, then passing the buffer through an exchanger, then recursing. The processing thread can begin by creating an empty buffer, and exchanging it through the exchanger. It then processes the result and recurses." ;
|
||||
|
||||
ABOUT: "concurrency.exchangers"
|
|
@ -0,0 +1,30 @@
|
|||
IN: temporary
|
||||
USING: sequences tools.test concurrency.exchangers
|
||||
concurrency.count-downs concurrency.promises locals kernel
|
||||
threads ;
|
||||
|
||||
:: exchanger-test | |
|
||||
[let |
|
||||
ex [ <exchanger> ]
|
||||
c [ 2 <count-down> ]
|
||||
v1! [ f ]
|
||||
v2! [ f ]
|
||||
pr [ <promise> ] |
|
||||
|
||||
[
|
||||
c await
|
||||
v1 ", " v2 3append pr fulfill
|
||||
] "Awaiter" spawn drop
|
||||
|
||||
[
|
||||
"Goodbye world" ex exchange v1! c count-down
|
||||
] "Exchanger 1" spawn drop
|
||||
|
||||
[
|
||||
"Hello world" ex exchange v2! c count-down
|
||||
] "Exchanger 2" spawn drop
|
||||
|
||||
pr ?promise
|
||||
] ;
|
||||
|
||||
[ "Hello world, Goodbye world" ] [ exchanger-test ] unit-test
|
|
@ -6,16 +6,21 @@ IN: concurrency.exchangers
|
|||
! Motivated by
|
||||
! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Exchanger.html
|
||||
|
||||
TUPLE: exchanger thread ;
|
||||
TUPLE: exchanger thread object ;
|
||||
|
||||
: <exchanger> ( -- exchanger )
|
||||
f exchanger construct-boa ;
|
||||
f f exchanger construct-boa ;
|
||||
|
||||
: pop-object ( exchanger -- obj )
|
||||
dup exchanger-object f rot set-exchanger-object ;
|
||||
|
||||
: pop-thread ( exchanger -- thread )
|
||||
dup exchanger-thread f rot set-exchanger-thread ;
|
||||
|
||||
: exchange ( obj exchanger -- newobj )
|
||||
dup exchanger-thread [
|
||||
dup exchanger-thread
|
||||
f rot set-exchanger-thread
|
||||
resume-with
|
||||
dup pop-object >r pop-thread resume-with r>
|
||||
] [
|
||||
[ set-exchanger-object ] keep
|
||||
[ set-exchanger-thread ] curry suspend
|
||||
] if ;
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Thread rendezvous points
|
|
@ -0,0 +1 @@
|
|||
Object exchange points
|
|
@ -0,0 +1,29 @@
|
|||
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
|
||||
! See http://factorcode.org/license.txt for BSD license.
|
||||
USING: concurrency.promises concurrency.messaging kernel arrays
|
||||
continuations help.markup help.syntax quotations ;
|
||||
IN: concurrency.futures
|
||||
|
||||
HELP: future
|
||||
{ $values { "quot" "a quotation with stack effect " { $snippet "( -- value )" } } { "future" future } }
|
||||
{ $description "Creates a deferred computation."
|
||||
$nl
|
||||
"The quotation begins with an empty data stack, an empty catch stack, and a name stack containing the global namespace only. This means that the only way to pass data to the quotation is to partially apply the data, for example using " { $link curry } " or " { $link compose } "." } ;
|
||||
|
||||
HELP: ?future-timeout
|
||||
{ $values { "future" future } { "timeout" "a timeout in milliseconds or " { $link f } } { "value" object } }
|
||||
{ $description "Waits for a deferred computation to complete, blocking indefinitely if " { $snippet "timeout" } " is " { $link f } ", otherwise waiting up to " { $snippet "timeout" } " milliseconds." }
|
||||
{ $errors "Throws an error if the timeout expires before the computation completes. Also throws an error if the future quotation threw an error." } ;
|
||||
|
||||
HELP: ?future
|
||||
{ $values { "future" future } { "value" object } }
|
||||
{ $description "Waits for a deferred computation to complete, blocking indefinitely." }
|
||||
{ $errors "Throws an error if future quotation threw an error." } ;
|
||||
|
||||
ARTICLE: "concurrency.futures" "Futures"
|
||||
"The " { $vocab-link "concurrency.futures" } " vocabulary implements " { $emphasis "futures" } ", which are deferred computations performed in a background thread. A thread may create a future, then proceed to perform other tasks, then later wait for the future to complete."
|
||||
{ $subsection future }
|
||||
{ $subsection ?future }
|
||||
{ $subsection ?future-timeout } ;
|
||||
|
||||
ABOUT: "concurrency.futures"
|
|
@ -0,0 +1,25 @@
|
|||
IN: temporary
|
||||
USING: concurrency.futures kernel tools.test threads ;
|
||||
|
||||
[ 50 ] [
|
||||
[ 50 ] future ?future
|
||||
] unit-test
|
||||
|
||||
[
|
||||
[ "this should propogate" throw ] future ?future
|
||||
] must-fail
|
||||
|
||||
[ ] [
|
||||
[ "this should not propogate" throw ] future drop
|
||||
] unit-test
|
||||
|
||||
! Race condition with futures
|
||||
[ 3 3 ] [
|
||||
[ 3 ] future
|
||||
dup ?future swap ?future
|
||||
] unit-test
|
||||
|
||||
! Another race
|
||||
[ 3 ] [
|
||||
[ 3 yield ] future ?future
|
||||
] unit-test
|
|
@ -0,0 +1,60 @@
|
|||
USING: help.markup help.syntax sequences kernel quotations ;
|
||||
IN: concurrency.locks
|
||||
|
||||
HELP: lock
|
||||
{ $class-description "The class of mutual exclusion locks." } ;
|
||||
|
||||
HELP: <lock>
|
||||
{ $values { "lock" lock } }
|
||||
{ $description "Creates a non-reentrant lock." } ;
|
||||
|
||||
HELP: <reentrant-lock>
|
||||
{ $values { "lock" lock } }
|
||||
{ $description "Creates a reentrant lock." } ;
|
||||
|
||||
HELP: with-lock
|
||||
{ $values { "lock" lock } { "timeout" "a timeout in milliseconds or " { $link f } } { "quot" quotation } }
|
||||
{ $description "Calls the quotation, ensuring that only one thread executes with the lock held at a time. If another thread is holding the lock, blocks until the thread releases the lock." }
|
||||
{ $errors "Throws an error if the lock could not be acquired before the timeout expires. A timeout value of " { $link f } " means the thread is willing to wait indefinitely." } ;
|
||||
|
||||
ARTICLE: "concurrency.locks.mutex" "Mutual-exclusion locks"
|
||||
"A mutual-exclusion lock ensures that only one thread executes with the lock held at a time. They are used to protect critical sections so that certain operations appear to be atomic to other threads."
|
||||
$nl
|
||||
"There are two varieties of locks: non-reentrant and reentrant. The latter may be acquired recursively by the same thread. Attempting to do so with the former will deadlock."
|
||||
{ $subsection lock }
|
||||
{ $subsection <lock> }
|
||||
{ $subsection <reentrant-lock> }
|
||||
{ $subsection with-lock } ;
|
||||
|
||||
HELP: rw-lock
|
||||
{ $class-description "The class of reader/writer locks." } ;
|
||||
|
||||
HELP: with-read-lock
|
||||
{ $values { "lock" lock } { "timeout" "a timeout in milliseconds or " { $link f } } { "quot" quotation } }
|
||||
{ $description "Calls the quotation, ensuring that no other thread is holding a write lock at the same time. If another thread is holding a write lock, blocks until the thread releases the lock." }
|
||||
{ $errors "Throws an error if the lock could not be acquired before the timeout expires. A timeout value of " { $link f } " means the thread is willing to wait indefinitely." } ;
|
||||
|
||||
HELP: with-write-lock
|
||||
{ $values { "lock" lock } { "timeout" "a timeout in milliseconds or " { $link f } } { "quot" quotation } }
|
||||
{ $description "Calls the quotation, ensuring that no other thread is holding a read or write lock at the same time. If another thread is holding a read or write lock, blocks until the thread releases the lock." }
|
||||
{ $errors "Throws an error if the lock could not be acquired before the timeout expires. A timeout value of " { $link f } " means the thread is willing to wait indefinitely." } ;
|
||||
|
||||
ARTICLE: "concurrency.locks.rw" "Read-write locks"
|
||||
"A read-write lock encapsulates a common pattern in the implementation of concurrent data structures, where one wishes to ensure that a thread is able to see a consistent view of the structure for a period of time, during which no other thread modifies the structure."
|
||||
$nl
|
||||
"While this can be achieved with a simple " { $link "concurrency.locks.mutex" } ", performance will suffer, since in fact multiple threads can view the structure at the same time; serialization must only be enforced for writes."
|
||||
$nl
|
||||
"Read/write locks allow any number of threads to hold the read lock simulateneously, however attempting to acquire a write lock blocks until all other threads release read locks and write locks."
|
||||
$nl
|
||||
"Read/write locks are reentrant. A thread holding a read lock may acquire a write lock recursively, and a thread holding a write lock may acquire a write lock or a read lock recursively, however a thread holding a read lock may not acquire a write lock recursively since that could break invariants assumed by the code executing with the read lock held."
|
||||
{ $subsection rw-lock }
|
||||
{ $subsection <rw-lock> }
|
||||
{ $subsection with-read-lock }
|
||||
{ $subsection with-write-lock } ;
|
||||
|
||||
ARTICLE: "concurrency.locks" "Locks"
|
||||
"A " { $emphasis "lock" } " is an object protecting a critical region of code, enforcing a particular mutual-exclusion policy. The " { $vocab-link "concurrency.locks" } " vocabulary implements two types of locks:"
|
||||
{ $subsection "concurrency.locks.mutex" }
|
||||
{ $subsection "concurrency.locks.rw" } ;
|
||||
|
||||
ABOUT: "concurrency.locks"
|
|
@ -0,0 +1,159 @@
|
|||
IN: temporary
|
||||
USING: tools.test concurrency.locks concurrency.count-downs
|
||||
locals kernel threads sequences ;
|
||||
|
||||
:: lock-test-0 | |
|
||||
[let | v [ V{ } clone ]
|
||||
c [ 2 <count-down> ] |
|
||||
|
||||
[
|
||||
yield
|
||||
1 v push
|
||||
yield
|
||||
2 v push
|
||||
c count-down
|
||||
] "Lock test 1" spawn drop
|
||||
|
||||
[
|
||||
yield
|
||||
3 v push
|
||||
yield
|
||||
4 v push
|
||||
c count-down
|
||||
] "Lock test 2" spawn drop
|
||||
|
||||
c await
|
||||
v
|
||||
] ;
|
||||
|
||||
:: lock-test-1 | |
|
||||
[let | v [ V{ } clone ]
|
||||
l [ <lock> ]
|
||||
c [ 2 <count-down> ] |
|
||||
|
||||
[
|
||||
l f [
|
||||
yield
|
||||
1 v push
|
||||
yield
|
||||
2 v push
|
||||
] with-lock
|
||||
c count-down
|
||||
] "Lock test 1" spawn drop
|
||||
|
||||
[
|
||||
l f [
|
||||
yield
|
||||
3 v push
|
||||
yield
|
||||
4 v push
|
||||
] with-lock
|
||||
c count-down
|
||||
] "Lock test 2" spawn drop
|
||||
|
||||
c await
|
||||
v
|
||||
] ;
|
||||
|
||||
[ V{ 1 3 2 4 } ] [ lock-test-0 ] unit-test
|
||||
[ V{ 1 2 3 4 } ] [ lock-test-1 ] unit-test
|
||||
|
||||
[ 3 ] [
|
||||
<reentrant-lock> dup f [
|
||||
f [
|
||||
3
|
||||
] with-lock
|
||||
] with-lock
|
||||
] unit-test
|
||||
|
||||
[ ] [ <rw-lock> drop ] unit-test
|
||||
|
||||
[ ] [ <rw-lock> f [ ] with-read-lock ] unit-test
|
||||
|
||||
[ ] [ <rw-lock> dup f [ f [ ] with-read-lock ] with-read-lock ] unit-test
|
||||
|
||||
[ ] [ <rw-lock> f [ ] with-write-lock ] unit-test
|
||||
|
||||
[ ] [ <rw-lock> dup f [ f [ ] with-write-lock ] with-write-lock ] unit-test
|
||||
|
||||
[ ] [ <rw-lock> dup f [ f [ ] with-read-lock ] with-write-lock ] unit-test
|
||||
|
||||
:: rw-lock-test-1 | |
|
||||
[let | l [ <rw-lock> ]
|
||||
c [ 1 <count-down> ]
|
||||
c' [ 1 <count-down> ]
|
||||
c'' [ 4 <count-down> ]
|
||||
v [ V{ } clone ] |
|
||||
|
||||
[
|
||||
l f [
|
||||
1 v push
|
||||
c count-down
|
||||
yield
|
||||
3 v push
|
||||
] with-read-lock
|
||||
c'' count-down
|
||||
] "R/W lock test 1" spawn drop
|
||||
|
||||
[
|
||||
c await
|
||||
l f [
|
||||
4 v push
|
||||
1000 sleep
|
||||
5 v push
|
||||
] with-write-lock
|
||||
c'' count-down
|
||||
] "R/W lock test 2" spawn drop
|
||||
|
||||
[
|
||||
c await
|
||||
l f [
|
||||
2 v push
|
||||
c' count-down
|
||||
] with-read-lock
|
||||
c'' count-down
|
||||
] "R/W lock test 4" spawn drop
|
||||
|
||||
[
|
||||
c' await
|
||||
l f [
|
||||
6 v push
|
||||
] with-write-lock
|
||||
c'' count-down
|
||||
] "R/W lock test 5" spawn drop
|
||||
|
||||
c'' await
|
||||
v
|
||||
] ;
|
||||
|
||||
[ V{ 1 2 3 4 5 6 } ] [ rw-lock-test-1 ] unit-test
|
||||
|
||||
:: rw-lock-test-2 | |
|
||||
[let | l [ <rw-lock> ]
|
||||
c [ 1 <count-down> ]
|
||||
c' [ 2 <count-down> ]
|
||||
v [ V{ } clone ] |
|
||||
|
||||
[
|
||||
l f [
|
||||
1 v push
|
||||
c count-down
|
||||
1000 sleep
|
||||
2 v push
|
||||
] with-write-lock
|
||||
c' count-down
|
||||
] "R/W lock test 1" spawn drop
|
||||
|
||||
[
|
||||
c await
|
||||
l f [
|
||||
3 v push
|
||||
] with-read-lock
|
||||
c' count-down
|
||||
] "R/W lock test 2" spawn drop
|
||||
|
||||
c' await
|
||||
v
|
||||
] ;
|
||||
|
||||
[ V{ 1 2 3 } ] [ rw-lock-test-2 ] unit-test
|
|
@ -5,9 +5,13 @@ concurrency.conditions ;
|
|||
IN: concurrency.locks
|
||||
|
||||
! Simple critical sections
|
||||
TUPLE: lock threads owner ;
|
||||
TUPLE: lock threads owner reentrant? ;
|
||||
|
||||
: <lock> <dlist> lock construct-boa ;
|
||||
: <lock> ( -- lock )
|
||||
<dlist> f f lock construct-boa ;
|
||||
|
||||
: <reentrant-lock> ( -- lock )
|
||||
<dlist> f t lock construct-boa ;
|
||||
|
||||
<PRIVATE
|
||||
|
||||
|
@ -21,16 +25,24 @@ TUPLE: lock threads owner ;
|
|||
lock-threads notify-1 ;
|
||||
|
||||
: do-lock ( lock timeout quot acquire release -- )
|
||||
>r >r pick r> call over r> curry [ ] cleanup ; inline
|
||||
>r swap compose pick >r 2curry r> r> curry [ ] cleanup ;
|
||||
inline
|
||||
|
||||
: (with-lock) ( lock timeout quot -- )
|
||||
[ acquire-lock ] [ release-lock ] do-lock ; inline
|
||||
|
||||
PRIVATE>
|
||||
|
||||
: with-lock ( lock timeout quot -- )
|
||||
[ acquire-lock ] [ release-lock ] do-lock ; inline
|
||||
|
||||
: with-reentrant-lock ( lock timeout quot -- )
|
||||
over lock-owner self eq?
|
||||
[ nip call ] [ with-lock ] if ; inline
|
||||
pick lock-reentrant? [
|
||||
pick lock-owner self eq? [
|
||||
2nip call
|
||||
] [
|
||||
(with-lock)
|
||||
] if
|
||||
] [
|
||||
(with-lock)
|
||||
] if ; inline
|
||||
|
||||
! Many-reader/single-writer locks
|
||||
TUPLE: rw-lock readers writers reader# writer ;
|
||||
|
@ -40,8 +52,8 @@ TUPLE: rw-lock readers writers reader# writer ;
|
|||
|
||||
<PRIVATE
|
||||
|
||||
: acquire-read-lock ( timeout lock -- )
|
||||
dup rw-lock-writer
|
||||
: acquire-read-lock ( lock timeout -- )
|
||||
over rw-lock-writer
|
||||
[ 2dup >r rw-lock-readers r> wait ] when drop
|
||||
dup rw-lock-reader# 1+ swap set-rw-lock-reader# ;
|
||||
|
||||
|
@ -52,8 +64,8 @@ TUPLE: rw-lock readers writers reader# writer ;
|
|||
dup rw-lock-reader# 1- dup pick set-rw-lock-reader#
|
||||
zero? [ notify-writer ] [ drop ] if ;
|
||||
|
||||
: acquire-write-lock ( lock -- )
|
||||
dup rw-lock-writer over rw-lock-reader# 0 > or
|
||||
: acquire-write-lock ( lock timeout -- )
|
||||
over rw-lock-writer pick rw-lock-reader# 0 > or
|
||||
[ 2dup >r rw-lock-writers r> wait ] when drop
|
||||
self swap set-rw-lock-writer ;
|
||||
|
||||
|
@ -62,7 +74,7 @@ TUPLE: rw-lock readers writers reader# writer ;
|
|||
dup rw-lock-readers dlist-empty?
|
||||
[ notify-writer ] [ rw-lock-readers notify-all ] if ;
|
||||
|
||||
: do-recursive-rw-lock ( lock timeout quot quot' -- )
|
||||
: do-reentrant-rw-lock ( lock timeout quot quot' -- )
|
||||
>r pick rw-lock-writer self eq? [ 2nip call ] r> if ; inline
|
||||
|
||||
PRIVATE>
|
||||
|
@ -70,9 +82,9 @@ PRIVATE>
|
|||
: with-read-lock ( lock timeout quot -- )
|
||||
[
|
||||
[ acquire-read-lock ] [ release-read-lock ] do-lock
|
||||
] do-recursive-rw-lock ; inline
|
||||
] do-reentrant-rw-lock ; inline
|
||||
|
||||
: with-write-lock ( lock timeout quot -- )
|
||||
[
|
||||
[ acquire-write-lock ] [ release-write-lock ] do-lock
|
||||
] do-recursive-rw-lock ; inline
|
||||
] do-reentrant-rw-lock ; inline
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
! Copyright (C) 2006 Chris Double.
|
||||
! See http://factorcode.org/license.txt for BSD license.
|
||||
USING: help.syntax help.markup concurrency.messaging.private
|
||||
threads ;
|
||||
threads kernel arrays quotations ;
|
||||
IN: concurrency.messaging
|
||||
|
||||
HELP: <mailbox>
|
||||
{ $values { "mailbox" mailbox }
|
||||
}
|
||||
{ $description "A mailbox is an object that can be used for safe thread communication. Items can be put in the mailbox and retrieved in a FIFO order. If the mailbox is empty when a get operation is performed then the thread will block until another thread places something in the mailbox. If multiple threads are waiting on the same mailbox, only one of the waiting threads will be unblocked to process the get operation." }
|
||||
{ $description "A mailbox is an object that can be used for safe thread communication. Items can be put in the mailbox and retrieved in a FIFO order. If the mailbox is empty when a get operation is performed then the thread will block until another thread places something in the mailbox. If multiple threads are waiting on the same mailbox, only one of the waiting threads will be unblocked to thread the get operation." }
|
||||
{ $see-also mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
|
||||
|
||||
HELP: mailbox-empty?
|
||||
|
@ -18,7 +18,7 @@ HELP: mailbox-empty?
|
|||
{ $see-also <mailbox> mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
|
||||
|
||||
HELP: mailbox-put
|
||||
{ $values { "obj" "an object" }
|
||||
{ $values { "obj" object }
|
||||
{ "mailbox" mailbox }
|
||||
}
|
||||
{ $description "Put the object into the mailbox. Any threads that have a blocking get on the mailbox are resumed. Only one of those threads will successfully get the object, the rest will immediately block waiting for the next item in the mailbox." }
|
||||
|
@ -27,29 +27,28 @@ HELP: mailbox-put
|
|||
HELP: block-unless-pred
|
||||
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
|
||||
{ "mailbox" mailbox }
|
||||
{ "timeout" "a timeout in milliseconds" }
|
||||
{ "timeout" "a timeout in milliseconds, or " { $link f } }
|
||||
}
|
||||
{ $description "Block the thread if there are no items in the mailbox that return true when the predicate is called with the item on the stack." }
|
||||
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
|
||||
|
||||
HELP: block-if-empty
|
||||
{ $values { "mailbox" mailbox }
|
||||
{ "mailbox2" "same object as 'mailbox'" }
|
||||
{ "timeout" "a timeout in milliseconds" }
|
||||
{ "timeout" "a timeout in milliseconds, or " { $link f } }
|
||||
}
|
||||
{ $description "Block the thread if the mailbox is empty." }
|
||||
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
|
||||
|
||||
HELP: mailbox-get
|
||||
{ $values { "mailbox" mailbox }
|
||||
{ "obj" "an object" }
|
||||
{ "obj" object }
|
||||
}
|
||||
{ $description "Get the first item put into the mailbox. If it is empty the thread blocks until an item is put into it. The thread then resumes, leaving the item on the stack." }
|
||||
{ $see-also <mailbox> mailbox-empty? mailbox-put while-mailbox-empty mailbox-get-all mailbox-get? } ;
|
||||
|
||||
HELP: mailbox-get-all
|
||||
{ $values { "mailbox" mailbox }
|
||||
{ "array" "an array" }
|
||||
{ "array" array }
|
||||
}
|
||||
{ $description "Blocks the thread if the mailbox is empty, otherwise removes all objects in the mailbox and returns an array containing the objects." }
|
||||
{ $see-also <mailbox> mailbox-empty? mailbox-put while-mailbox-empty mailbox-get-all mailbox-get? } ;
|
||||
|
@ -64,67 +63,93 @@ HELP: while-mailbox-empty
|
|||
HELP: mailbox-get?
|
||||
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
|
||||
{ "mailbox" mailbox }
|
||||
{ "obj" "an object" }
|
||||
{ "obj" object }
|
||||
}
|
||||
{ $description "Get the first item in the mailbox which satisfies the predicate. 'pred' will be called repeatedly for each item in the mailbox. When 'pred' returns true that item will be returned. If nothing in the mailbox satisfies the predicate then the thread will block until something does." }
|
||||
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty } ;
|
||||
|
||||
HELP: send
|
||||
{ $values { "message" "an object" }
|
||||
{ "process" "a process object" }
|
||||
{ $values { "message" object }
|
||||
{ "thread" "a thread object" }
|
||||
}
|
||||
{ $description "Send the message to the process by placing it in the processes mailbox. This is an asynchronous operation and will return immediately. The receving process will act on the message the next time it retrieves that item from its mailbox (usually using the " { $link receive } " word. The message can be any Factor object. For destinations that are instances of remote-process the message must be a serializable Factor type." }
|
||||
{ $description "Send the message to the thread by placing it in the threades mailbox. This is an asynchronous operation and will return immediately. The receving thread will act on the message the next time it retrieves that item from its mailbox (usually using the " { $link receive } " word. The message can be any Factor object. For destinations that are instances of remote-thread the message must be a serializable Factor type." }
|
||||
{ $see-also receive receive-if } ;
|
||||
|
||||
HELP: receive
|
||||
{ $values { "message" "an object" }
|
||||
{ $values { "message" object }
|
||||
}
|
||||
{ $description "Return a message from the current processes mailbox. If the box is empty, suspend the process until another process places an item in the mailbox (usually via the " { $link send } " word." }
|
||||
{ $description "Return a message from the current threades mailbox. If the box is empty, suspend the thread until another thread places an item in the mailbox (usually via the " { $link send } " word." }
|
||||
{ $see-also send receive-if } ;
|
||||
|
||||
HELP: receive-if
|
||||
{ $values { "pred" "a predicate with stack effect " { $snippet "( X -- bool )" } }
|
||||
{ "message" "an object" }
|
||||
{ $values { "pred" "a predicate with stack effect " { $snippet "( obj -- ? )" } }
|
||||
{ "message" object }
|
||||
}
|
||||
{ $description "Return the first message from the current processes mailbox that satisfies the predicate. To satisfy the predicate, 'pred' is called with the item on the stack and the predicate should leave a boolean indicating whether it was satisfied or not. If nothing in the mailbox satisfies the predicate then the process will block until something does." }
|
||||
{ $description "Return the first message from the current threades mailbox that satisfies the predicate. To satisfy the predicate, " { $snippet "pred" } " is called with the item on the stack and the predicate should leave a boolean indicating whether it was satisfied or not. If nothing in the mailbox satisfies the predicate then the thread will block until something does." }
|
||||
{ $see-also send receive } ;
|
||||
|
||||
HELP: spawn-linked
|
||||
{ $values { "quot" "a predicate with stack effect " { $snippet "( -- )" } }
|
||||
{ "process" "a process object" }
|
||||
{ $values { "quot" quotation }
|
||||
{ "thread" "a thread object" }
|
||||
}
|
||||
{ $description "Start a process which runs the given quotation. If that quotation throws an error which is not caught then the error will get propagated to the process that spawned it. This can be used to set up 'supervisor' processes that restart child processes that crash due to uncaught errors.\n" }
|
||||
{ $description "Start a thread which runs the given quotation. If that quotation throws an error which is not caught then the error will get propagated to the thread that spawned it. This can be used to set up 'supervisor' threades that restart child threades that crash due to uncaught errors.\n" }
|
||||
{ $see-also spawn } ;
|
||||
|
||||
ARTICLE: { "concurrency" "processes" } "Processes"
|
||||
"A process is basically a thread with a message queue. Other processes can place items on this queue by sending the process a message. A process can check its queue for messages, blocking if none are pending, and process them as they are queued.\n\nFactor processes are very lightweight. Each process can take as little as 900 bytes of memory. This library has been tested running hundreds of thousands of simple processes.\n\nThe messages that are sent from process to process are any Factor value. Factor tuples are ideal for this sort of thing as you can send a tuple to a process and the predicate dispatch mechanism can be used to perform actions depending on what the type of the tuple is.\n\nProcesses are usually created using " { $link spawn } ". This word takes a quotation on the stack and starts a process that will execute that quotation asynchronously. When the quotation completes the process will die. 'spawn' leaves on the stack the process object that was started. This object can be used to send messages to the process using " { $link send } ".\n\n'send' will return immediately after placing the message in the target processes message queue.\n\nA process can get a message from its queue using " { $link receive } ". This will get the most recent message and leave it on the stack. If there are no messages in the queue the process will 'block' until a message is available. When a process is blocked it takes no CPU time at all."
|
||||
{ $code "[ receive print ] spawn\n\"Hello Process!\" swap send" }
|
||||
"This example spawns a process that first blocks, waiting to receive a message. When a message is received, the 'receive' call returns leaving it on the stack. It then prints the message and exits. 'spawn' left the process on the stack so it's available to send the 'Hello Process!' message to it. Immediately after the 'send' you should see 'Hello Process!' printed on the console.\n\nIt is also possible to selectively retrieve messages from the message queue. " { $link receive-if } " takes a predicate quotation on the stack and returns the first message in the queue that satisfies the predicate. If no items satisfy the predicate then the process is blocked until a message is received that does."
|
||||
{ $code ": odd? ( n -- ? ) 2 mod 1 = ;\n1 self send 2 self send 3 self send\n\nreceive .\n => 1\n\n[ odd? ] receive-if .\n => 3\n\nreceive .\n => 2" } ;
|
||||
ARTICLE: { "concurrency" "mailboxes" } "Mailboxes"
|
||||
"Each thread has an associated message queue. Other threads can place items on this queue by sending the thread a message. A thread can check its queue for messages, blocking if none are pending, and thread them as they are queued."
|
||||
$nl
|
||||
"The messages that are sent from thread to thread are any Factor value. Factor tuples are ideal for this sort of thing as you can send a tuple to a thread and the generic word dispatch mechanism can be used to perform actions depending on what the type of the tuple is."
|
||||
$nl
|
||||
"The " { $link spawn } " word pushes the newly-created thread on the calling thread's stack; this thread object can then be sent messages:"
|
||||
{ $subsection send }
|
||||
"A thread can get a message from its queue:"
|
||||
{ $subsection receive }
|
||||
{ $subsection receive }
|
||||
{ $subsection receive-if }
|
||||
"Mailboxes can be created and used directly:"
|
||||
{ $subsection mailbox }
|
||||
{ $subsection <mailbox> }
|
||||
{ $subsection mailbox-get }
|
||||
{ $subsection mailbox-put }
|
||||
{ $subsection mailbox-empty? } ;
|
||||
|
||||
ARTICLE: { "concurrency" "self" } "Self"
|
||||
"A process can get access to its own process object using " { $link self } " so it can pass it to other processes. This allows the other processes to send messages back. A simple example of using this gets the current process' 'self' and spawns a process which sends a message to it. We then receive the message from the original process:"
|
||||
{ $code "self [ \"Hello!\" swap send ] spawn 2drop receive .\n => \"Hello!\"" } ;
|
||||
ARTICLE: { "concurrency" "synchronous-sends" } "Synchronous sends"
|
||||
"The " { $link send } " word sends a message asynchronously, and the sending thread continues immediately. It is also possible to send a message to a thread and block until a response is received:"
|
||||
{ $subsection send-synchronous }
|
||||
"To reply to a synchronous message:"
|
||||
{ $subsection reply-synchronous }
|
||||
"An example:"
|
||||
{ $example
|
||||
"USING: concurrency.messaging kernel threads ;"
|
||||
": pong-server ( -- )"
|
||||
" receive >r \"pong\" r> reply-synchronous ;"
|
||||
"[ pong-server t ] spawn-server"
|
||||
"\"ping\" swap send-synchronous ."
|
||||
"\"pong\""
|
||||
} ;
|
||||
|
||||
ARTICLE: { "concurrency" "synchronous-sends" } "Synchronous Sends"
|
||||
{ $link send } " sends a message asynchronously, and the sending process continues immediately. The 'pong server' example shown previously all sent messages to the server and waited for a reply back from the server. This pattern of synchronous sending is made easier with " { $link send-synchronous } ".\n\nThis word will send a message to the given process and immediately block until a reply is received for this particular message send. It leaves the reply on the stack. Note that it doesn't wait for just any reply, it waits for a reply specifically to this send.\n\nTo do this it wraps the requested message inside a tagged message format using " { $link <synchronous> } ":"
|
||||
{ $code "\"My Message\" <synchronous> .\n => T{ synchronous f \"My Message\" ...from... ...tag... }" }
|
||||
"The message is wrapped in array where the first item is the sending process object, the second is a unique tag, and the third is the original message. Server processes can use the 'from' to reply to the process that originally sent the message. The tag is used in the receiving server to include the value in the reply. After the send-synchronous call the current process will block waiting for a reply that has the exact same tag. In this way you can be sure that the reply you got was for the specific message sent. Here is the pong-server recoded to use 'send-synchronous':"
|
||||
{ $code ": pong-server ( -- )\n receive {\n { { ?from ?tag \"ping\" } [ ?tag \"pong\" 2array ?from send pong-server ] }\n { { ?from _ } [ ?tag \"server shutdown\" 2array ?from send ] }\n } match-cond ;\n\n[ pong-server ] spawn \"ping\" swap send-synchronous .\n => \"pong\"" }
|
||||
"Notice that the code to send the reply back to the original caller wraps the reply in an array where the first item is the tag originally sent. 'send-synchronous' only returns if it receives a reply containing that specific tag." ;
|
||||
|
||||
ARTICLE: { "concurrency" "exceptions" } "Exceptions"
|
||||
"A process can handle exceptions using the standard Factor exception handling mechanism. If an exception is uncaught the process will terminate. For example:"
|
||||
ARTICLE: { "concurrency" "exceptions" } "Linked exceptions"
|
||||
"A thread can handle exceptions using the standard Factor exception handling mechanism. If an exception is uncaught the thread will terminate. For example:"
|
||||
{ $code "[ 1 0 / \"This will not print\" print ] spawn" }
|
||||
"Processes can be linked so that a parent process can receive the exception that caused the child process to terminate. In this way 'supervisor' processes can be created that are notified when child processes terminate and possibly restart them.\n\nThe easiest way to form this link is using " { $link spawn-linked } ". This will create a unidirectional link, such that if an uncaught exception causes the child to terminate, the parent process can catch it:"
|
||||
{ $code "[\n [ 1 0 / \"This will not print\" print ] spawn-link drop\n receive\n] [ \"Exception caught.\" print ] recover" }
|
||||
"Processes can be linked so that a parent thread can receive the exception that caused the child thread to terminate. In this way 'supervisor' threades can be created that are notified when child threades terminate and possibly restart them."
|
||||
{ $subsection spawn-linked }
|
||||
"A more flexible version of the above deposits the error in an arbitary mailbox:"
|
||||
{ $subsection spawn-linked-to }
|
||||
"This will create a unidirectional link, such that if an uncaught exception causes the child to terminate, the parent thread can catch it:"
|
||||
{ $code "["
|
||||
" [ 1 0 / \"This will not print\" print ] spawn-linked drop"
|
||||
" receive"
|
||||
"] [ \"Exception caught.\" print ] recover" }
|
||||
"Exceptions are only raised in the parent when the parent does a " { $link receive } " or " { $link receive-if } ". This is because the exception is sent from the child to the parent as a message." ;
|
||||
|
||||
ARTICLE: { "concurrency" "concurrency" } "Concurrency"
|
||||
"The concurrency library is based upon the style of concurrency used in systems like Erlang and Termite. It is built on top of the standard Factor lightweight thread system.\nA concurrency oriented program is one in which multiple processes run simultaneously in a single Factor image or across multiple running Factor instances. The processes can communicate with each other by asynchronous message sends. Although processes can share data via Factor's mutable data structures it is not recommended as the use of shared state concurrency is often a cause of problems."
|
||||
{ $subsection { "concurrency" "processes" } }
|
||||
{ $subsection { "concurrency" "self" } }
|
||||
ARTICLE: "concurrency.messaging" "Message-passing concurrency"
|
||||
"The " { $vocab-link "concurrency.messaging" } " vocabulary is based upon the style of concurrency used in systems like Erlang and Termite. It is built on top of the standard Factor lightweight thread system."
|
||||
$nl
|
||||
"A concurrency oriented program is one in which multiple threades run simultaneously in a single Factor image or across multiple running Factor instances. The threades can communicate with each other by asynchronous message sends."
|
||||
$nl
|
||||
"Although threades can share data via Factor's mutable data structures it is not recommended to mix shared state with message passing as it can lead to confusing code."
|
||||
{ $subsection { "concurrency" "mailboxes" } }
|
||||
{ $subsection { "concurrency" "synchronous-sends" } }
|
||||
{ $subsection { "concurrency" "exceptions" } } ;
|
||||
|
||||
ABOUT: { "concurrency" "concurrency" }
|
||||
ABOUT: "concurrency.messaging"
|
||||
|
|
|
@ -3,17 +3,17 @@
|
|||
!
|
||||
USING: kernel threads vectors arrays sequences
|
||||
namespaces tools.test continuations dlists strings math words
|
||||
match quotations concurrency.private ;
|
||||
match quotations concurrency.messaging ;
|
||||
IN: temporary
|
||||
|
||||
[ ] [ self process-mailbox mailbox-data dlist-delete-all ] unit-test
|
||||
[ ] [ mailbox mailbox-data dlist-delete-all ] unit-test
|
||||
|
||||
[ V{ 1 2 3 } ] [
|
||||
0 <vector>
|
||||
make-mailbox
|
||||
2dup [ mailbox-get swap push ] 2curry in-thread
|
||||
2dup [ mailbox-get swap push ] 2curry in-thread
|
||||
2dup [ mailbox-get swap push ] 2curry in-thread
|
||||
<mailbox>
|
||||
[ mailbox-get swap push ] in-thread
|
||||
[ mailbox-get swap push ] in-thread
|
||||
[ mailbox-get swap push ] in-thread
|
||||
1 over mailbox-put
|
||||
2 over mailbox-put
|
||||
3 swap mailbox-put
|
||||
|
@ -21,10 +21,10 @@ IN: temporary
|
|||
|
||||
[ V{ 1 2 3 } ] [
|
||||
0 <vector>
|
||||
make-mailbox
|
||||
2dup [ [ integer? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
2dup [ [ integer? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
2dup [ [ integer? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
<mailbox>
|
||||
[ [ integer? ] swap mailbox-get? swap push ] in-thread
|
||||
[ [ integer? ] swap mailbox-get? swap push ] in-thread
|
||||
[ [ integer? ] swap mailbox-get? swap push ] in-thread
|
||||
1 over mailbox-put
|
||||
2 over mailbox-put
|
||||
3 swap mailbox-put
|
||||
|
@ -32,11 +32,11 @@ IN: temporary
|
|||
|
||||
[ V{ 1 "junk" 3 "junk2" } [ 456 ] ] [
|
||||
0 <vector>
|
||||
make-mailbox
|
||||
2dup [ [ integer? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
2dup [ [ integer? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
2dup [ [ string? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
2dup [ [ string? ] swap mailbox-get? swap push ] 2curry in-thread
|
||||
<mailbox>
|
||||
[ [ integer? ] swap mailbox-get? swap push ] in-thread
|
||||
[ [ integer? ] swap mailbox-get? swap push ] in-thread
|
||||
[ [ string? ] swap mailbox-get? swap push ] in-thread
|
||||
[ [ string? ] swap mailbox-get? swap push ] in-thread
|
||||
1 over mailbox-put
|
||||
"junk" over mailbox-put
|
||||
[ 456 ] over mailbox-put
|
||||
|
@ -45,17 +45,11 @@ IN: temporary
|
|||
mailbox-get
|
||||
] unit-test
|
||||
|
||||
[ "test" ] [
|
||||
[ self ] "test" with-process
|
||||
] unit-test
|
||||
|
||||
|
||||
[ "received" ] [
|
||||
[
|
||||
receive {
|
||||
{ { ?from ?tag _ } [ ?tag "received" 2array ?from send ] }
|
||||
} match-cond
|
||||
] spawn
|
||||
receive "received" swap reply-synchronous
|
||||
] "Synchronous test" spawn
|
||||
"sent" swap send-synchronous
|
||||
] unit-test
|
||||
|
||||
|
@ -68,74 +62,29 @@ IN: temporary
|
|||
receive
|
||||
] unit-test
|
||||
|
||||
|
||||
[
|
||||
[
|
||||
"crash" throw
|
||||
] spawn-link drop
|
||||
] "Linked test" spawn-linked drop
|
||||
receive
|
||||
] [ "crash" = ] must-fail-with
|
||||
] [ linked-error "crash" = ] must-fail-with
|
||||
|
||||
[ 50 ] [
|
||||
[ 50 ] future ?future
|
||||
] unit-test
|
||||
|
||||
[ V{ 50 50 50 } ] [
|
||||
0 <vector>
|
||||
<promise>
|
||||
2dup [ ?promise swap push ] 2curry spawn drop
|
||||
2dup [ ?promise swap push ] 2curry spawn drop
|
||||
2dup [ ?promise swap push ] 2curry spawn drop
|
||||
50 swap fulfill
|
||||
] unit-test
|
||||
|
||||
MATCH-VARS: ?value ;
|
||||
MATCH-VARS: ?from ?to ?value ;
|
||||
SYMBOL: increment
|
||||
SYMBOL: decrement
|
||||
SYMBOL: value
|
||||
|
||||
: counter ( value -- )
|
||||
: counter ( value -- value )
|
||||
receive {
|
||||
{ { increment ?value } [ ?value + counter ] }
|
||||
{ { decrement ?value } [ ?value - counter ] }
|
||||
{ { value ?from } [ dup ?from send counter ] }
|
||||
{ { increment ?value } [ ?value + ] }
|
||||
{ { decrement ?value } [ ?value - ] }
|
||||
{ { value ?from } [ dup ?from send ] }
|
||||
} match-cond ;
|
||||
|
||||
[ -5 ] [
|
||||
[ 0 counter ] spawn
|
||||
[ 0 [ t ] [ counter ] [ ] while ] "Counter" spawn
|
||||
{ increment 10 } over send
|
||||
{ decrement 15 } over send
|
||||
[ value , self , ] { } make swap send
|
||||
receive
|
||||
] unit-test
|
||||
|
||||
! The following unit test blocks forever if the
|
||||
! exception does not propogate. Uncomment when
|
||||
! this is fixed (via a timeout).
|
||||
[
|
||||
[ "this should propogate" throw ] future ?future
|
||||
] must-fail
|
||||
|
||||
[ ] [
|
||||
[ "this should not propogate" throw ] future drop
|
||||
] unit-test
|
||||
|
||||
[ f ] [
|
||||
[ 1 drop ] spawn 100 sleep process-pid get-process
|
||||
] unit-test
|
||||
|
||||
[ f ] [
|
||||
[ "testing unregistering on error" throw ] spawn
|
||||
100 sleep process-pid get-process
|
||||
] unit-test
|
||||
|
||||
! Race condition with futures
|
||||
[ 3 3 ] [
|
||||
[ 3 ] future
|
||||
dup ?future swap ?future
|
||||
] unit-test
|
||||
|
||||
! Another race
|
||||
[ 3 ] [
|
||||
[ 3 yield ] future ?future
|
||||
] unit-test
|
|
@ -26,12 +26,12 @@ TUPLE: mailbox threads data ;
|
|||
2over mailbox-data dlist-contains? [
|
||||
3drop
|
||||
] [
|
||||
2dup mailbox-threads wait block-unless-pred
|
||||
2dup >r mailbox-threads r> wait block-unless-pred
|
||||
] if ; inline
|
||||
|
||||
: block-if-empty ( mailbox timeout -- mailbox )
|
||||
over mailbox-empty? [
|
||||
2dup mailbox-threads wait block-if-empty
|
||||
2dup >r mailbox-threads r> wait block-if-empty
|
||||
] [
|
||||
drop
|
||||
] if ;
|
||||
|
@ -39,10 +39,10 @@ TUPLE: mailbox threads data ;
|
|||
PRIVATE>
|
||||
|
||||
: mailbox-peek ( mailbox -- obj )
|
||||
mailbox-data peek-front ;
|
||||
mailbox-data peek-back ;
|
||||
|
||||
: mailbox-get-timeout ( mailbox timeout -- obj )
|
||||
block-if-empty mailbox-data pop-front ;
|
||||
block-if-empty mailbox-data pop-back ;
|
||||
|
||||
: mailbox-get ( mailbox -- obj )
|
||||
f mailbox-get-timeout ;
|
||||
|
@ -68,13 +68,13 @@ PRIVATE>
|
|||
mailbox-data delete-node-if ; inline
|
||||
|
||||
: mailbox-get? ( pred mailbox -- obj )
|
||||
f mailbox-timeout-get? ;
|
||||
f mailbox-timeout-get? ; inline
|
||||
|
||||
TUPLE: linked error thread ;
|
||||
|
||||
: <linked> self linked construct-boa ;
|
||||
C: <linked> linked
|
||||
|
||||
GENERIC: send ( message thread -- )
|
||||
GENERIC: send ( message process -- )
|
||||
|
||||
: mailbox-of ( thread -- mailbox )
|
||||
dup thread-mailbox [ ] [
|
||||
|
@ -94,7 +94,7 @@ M: thread send ( message thread -- )
|
|||
: receive-if ( pred -- message )
|
||||
mailbox mailbox-get? ?linked ; inline
|
||||
|
||||
: rethrow-linked ( error supervisor -- )
|
||||
: rethrow-linked ( error process supervisor -- )
|
||||
>r <linked> r> send ;
|
||||
|
||||
: spawn-linked-to ( quot name mailbox -- thread )
|
||||
|
@ -115,9 +115,13 @@ TUPLE: reply data tag ;
|
|||
synchronous-tag \ reply construct-boa ;
|
||||
|
||||
: send-synchronous ( message thread -- reply )
|
||||
>r <synchronous> dup r> send
|
||||
[ over reply? [ reply-tag = ] [ 2drop f ] if ] curry
|
||||
receive-if reply-data ;
|
||||
>r <synchronous> dup r> send [
|
||||
over reply? [
|
||||
>r reply-tag r> synchronous-tag =
|
||||
] [
|
||||
2drop f
|
||||
] if
|
||||
] curry receive-if reply-data ;
|
||||
|
||||
: reply-synchronous ( message synchronous -- )
|
||||
[ <reply> ] keep synchronous-sender send ;
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
|
||||
! See http://factorcode.org/license.txt for BSD license.
|
||||
USING: concurrency.messaging kernel arrays
|
||||
continuations help.markup help.syntax quotations ;
|
||||
IN: concurrency.promises
|
||||
|
||||
HELP: promise
|
||||
{ $class-description "The class of write-once promises." } ;
|
||||
|
||||
HELP: promise-fulfilled?
|
||||
{ $values { "promise" promise } { "?" "a boolean" } }
|
||||
{ $description "Tests if " { $link fulfill } " has previously been called on the promise, in which case " { $link ?promise } " will return immediately without blocking." } ;
|
||||
|
||||
HELP: ?promise-timeout
|
||||
{ $values { "promise" promise } { "timeout" "a timeout in milliseconds or " { $link f } } { "value" object } }
|
||||
{ $description "Waits for another thread to fulfill a promise, returning immediately if the promise has already been fulfilled. A timeout of " { $link f } " indicates that the thread may block indefinitely, otherwise it will wait up to " { $snippet "timeout" } " milliseconds." }
|
||||
{ $errors "Throws an error if the timeout expires before the promise has been fulfilled." } ;
|
||||
|
||||
HELP: ?promise
|
||||
{ $values { "promise" promise } { "value" object } }
|
||||
{ $description "Waits for another thread to fulfill a promise, returning immediately if the promise has already been fulfilled." } ;
|
||||
|
||||
HELP: fulfill
|
||||
{ $values { "value" object } { "promise" promise } }
|
||||
{ $description "Fulfills a promise by writing a value to it. Any threads waiting for the value are notified." }
|
||||
{ $errors "Throws an error if the promise has already been fulfilled." } ;
|
||||
|
||||
ARTICLE: "concurrency.promises" "Promises"
|
||||
"The " { $vocab-link "concurrency.promises" } " vocabulary implements " { $emphasis "promises" } ", which are thread-safe write-once variables. Once a promise is created, threads may block waiting for it to be " { $emphasis "fulfilled" } "; at some point in the future, another thread may provide a value at which point all waiting threads are notified."
|
||||
{ $subsection promise }
|
||||
{ $subsection <promise> }
|
||||
{ $subsection fulfill }
|
||||
{ $subsection ?promise }
|
||||
{ $subsection ?promise-timeout } ;
|
||||
|
||||
ABOUT: "concurrency.promises"
|
|
@ -0,0 +1,12 @@
|
|||
IN: temporary
|
||||
USING: vectors concurrency.promises kernel threads sequences
|
||||
tools.test ;
|
||||
|
||||
[ V{ 50 50 50 } ] [
|
||||
0 <vector>
|
||||
<promise>
|
||||
[ ?promise swap push ] in-thread
|
||||
[ ?promise swap push ] in-thread
|
||||
[ ?promise swap push ] in-thread
|
||||
50 swap fulfill
|
||||
] unit-test
|
|
@ -0,0 +1,45 @@
|
|||
IN: concurrency.semaphores
|
||||
USING: help.markup help.syntax kernel quotations ;
|
||||
|
||||
HELP: semaphore
|
||||
{ $class-description "The class of counting semaphores." } ;
|
||||
|
||||
HELP: <semaphore>
|
||||
{ $values { "n" "a non-negative integer" } { "semaphore" semaphore } }
|
||||
{ $description "Creates a counting semaphore with the specified initial count." } ;
|
||||
|
||||
HELP: acquire
|
||||
{ $values { "semaphore" semaphore } { "timeout" "a timeout in milliseconds or " { $link f } } { "value" object } }
|
||||
{ $description "If the semaphore has a non-zero count, decrements it and returns immediately. Otherwise, if the timeout is " { $link f } ", waits indefinitely for the semaphore to be released. If the timeout is not " { $link f } ", waits up to that number of milliseconds for the semaphore to be released." } ;
|
||||
|
||||
HELP: release
|
||||
{ $values { "semaphore" semaphore } }
|
||||
{ $description "Increments a semaphore's count. If the count was previously zero, any threads waiting on the semaphore are woken up." } ;
|
||||
|
||||
HELP: with-semaphore
|
||||
{ $values { "semaphore" semaphore } { "quot" quotation } }
|
||||
{ $description "Calls the quotation with the semaphore held." } ;
|
||||
|
||||
ARTICLE: "concurrency.semaphores" "Counting semaphores"
|
||||
"Counting semaphores are used to ensure that no more than a fixed number of threads are executing in a critical section at a time; as such, they generalize " { $link "concurrency.locks.mutex" } ", since locks can be thought of as semaphores with an initial count of 1."
|
||||
$nl
|
||||
"A use-case would be a batch processing server which runs a large number of jobs which perform calculations but then need to fire off expensive external processes or perform heavy network I/O. While for most of the time, the threads can all run in parallel, it might be desired that the expensive operation is not run by more than 10 threads at once, to avoid thrashing swap space or saturating the network. This can be accomplished with a counting semaphore:"
|
||||
{ $code
|
||||
"SYMBOL: expensive-section"
|
||||
"10 <semaphore> expensive-section set-global"
|
||||
"requests ["
|
||||
" ..."
|
||||
" expensive-section [ do-expensive-stuff ] with-semaphore"
|
||||
" ..."
|
||||
"] parallel-map"
|
||||
}
|
||||
"Creating semaphores:"
|
||||
{ $subsection semaphore }
|
||||
{ $subsection <semaphore> }
|
||||
"Unlike locks, where acquisition and release are always paired by a combinator, semaphores expose these operations directly and there is no requirement that they be performed in the same thread:"
|
||||
{ $subsection acquire }
|
||||
{ $subsection release }
|
||||
"A combinator which pairs acquisition and release:"
|
||||
{ $subsection with-semaphore } ;
|
||||
|
||||
ABOUT: "concurrency.semaphores"
|
|
@ -1,23 +1,29 @@
|
|||
! Copyright (C) 2008 Slava Pestov.
|
||||
! See http://factorcode.org/license.txt for BSD license.
|
||||
USING: dlists kernel threads math ;
|
||||
USING: dlists kernel threads math concurrency.conditions
|
||||
continuations ;
|
||||
IN: concurrency.semaphores
|
||||
|
||||
TUPLE: semaphore count threads ;
|
||||
|
||||
: <semaphore> ( -- semaphore )
|
||||
: <semaphore> ( n -- semaphore )
|
||||
dup 0 < [ "Cannot have semaphore with negative count" throw ] when
|
||||
0 <dlist> semaphore construct-boa ;
|
||||
|
||||
: wait-to-acquire ( semaphore -- )
|
||||
[ semaphore-threads push-front ] suspend drop ;
|
||||
: wait-to-acquire ( semaphore timeout -- )
|
||||
>r semaphore-threads r> wait ;
|
||||
|
||||
: acquire ( semaphore -- )
|
||||
: acquire ( semaphore timeout -- )
|
||||
dup semaphore-count zero? [
|
||||
wait-to-acquire
|
||||
] [
|
||||
drop
|
||||
dup semaphore-count 1- swap set-semaphore-count
|
||||
] if ;
|
||||
|
||||
: release ( semaphore -- )
|
||||
dup semaphore-count 1+ over set-semaphore-count
|
||||
semaphore-threads notify-1 ;
|
||||
|
||||
: with-semaphore ( semaphore quot -- )
|
||||
over acquire [ release ] curry [ ] cleanup ; inline
|
||||
|
|
|
@ -78,8 +78,36 @@ ARTICLE: "dataflow" "Data and control flow"
|
|||
{ $subsection "conditionals" }
|
||||
{ $subsection "basic-combinators" }
|
||||
{ $subsection "combinators" }
|
||||
{ $subsection "continuations" }
|
||||
{ $subsection "threads" } ;
|
||||
{ $subsection "continuations" } ;
|
||||
|
||||
USING: concurrency.combinators
|
||||
concurrency.messaging
|
||||
concurrency.promises
|
||||
concurrency.futures
|
||||
concurrency.distributed
|
||||
concurrency.locks
|
||||
concurrency.semaphores
|
||||
concurrency.count-downs
|
||||
concurrency.exchangers ;
|
||||
|
||||
ARTICLE: "concurrency" "Concurrency"
|
||||
"Factor supports a variety of concurrency abstractions, however they are mostly used to multiplex input/output operations since the thread scheduling is co-operative and only one CPU is used at a time."
|
||||
$nl
|
||||
"Factor's concurrency support was insipired by Erlang, Termite, Scheme48 and Java's " { $snippet "java.util.concurrent" } " library."
|
||||
$nl
|
||||
"The basic building blocks:"
|
||||
{ $subsection "threads" }
|
||||
"High-level abstractions:"
|
||||
{ $subsection "concurrency.combinators" }
|
||||
{ $subsection "concurrency.promises" }
|
||||
{ $subsection "concurrency.futures" }
|
||||
{ $subsection "concurrency.messaging" }
|
||||
{ $subsection "concurrency.distributed" }
|
||||
"Shared-state abstractions:"
|
||||
{ $subsection "concurrency.locks" }
|
||||
{ $subsection "concurrency.semaphores" }
|
||||
{ $subsection "concurrency.count-downs" }
|
||||
{ $subsection "concurrency.exchangers" } ;
|
||||
|
||||
ARTICLE: "objects" "Objects"
|
||||
"An " { $emphasis "object" } " is any datum which may be identified. All values are objects in Factor. Each object carries type information, and types are checked at runtime; Factor is dynamically typed."
|
||||
|
@ -216,6 +244,7 @@ ARTICLE: "handbook" "Factor documentation"
|
|||
{ $subsection "numbers" }
|
||||
{ $subsection "collections" }
|
||||
{ $subsection "io" }
|
||||
{ $subsection "concurrency" }
|
||||
{ $subsection "os" }
|
||||
{ $subsection "alien" }
|
||||
{ $heading "Environment reference" }
|
||||
|
|
|
@ -56,7 +56,7 @@ SYMBOL: data-mode
|
|||
data-mode off
|
||||
"220 OK\r\n" write flush t
|
||||
] }
|
||||
{ [ data-mode get ] [ global [ print ] bind t ] }
|
||||
{ [ data-mode get ] [ dup global [ print ] bind t ] }
|
||||
{ [ t ] [
|
||||
"500 ERROR\r\n" write flush t
|
||||
] }
|
||||
|
|
|
@ -107,6 +107,7 @@ MEMO: all-vocabs-seq ( -- seq )
|
|||
{ [ "ui.windows" ?head ] [ t ] }
|
||||
{ [ "ui.cocoa" ?head ] [ t ] }
|
||||
{ [ "cocoa" ?head ] [ t ] }
|
||||
{ [ "core-foundation" ?head ] [ t ] }
|
||||
{ [ "vocabs.loader.test" ?head ] [ t ] }
|
||||
{ [ "editors." ?head ] [ t ] }
|
||||
{ [ ".windows" ?tail ] [ t ] }
|
||||
|
|
|
@ -16,7 +16,7 @@ io io.styles sequences assocs namespaces sorting ;
|
|||
[ [ write ] with-cell ] each
|
||||
] with-row
|
||||
|
||||
threads get-global >alist sort-keys values [
|
||||
threads >alist sort-keys values [
|
||||
[ thread. ] with-row
|
||||
] each
|
||||
] tabular-output ;
|
||||
|
|
Loading…
Reference in New Issue