Split off concurrency.mailboxes, add timeout support to promises, locks, mailboxes, semaphores, count-downs

db4
Slava Pestov 2008-02-21 23:47:06 -06:00
parent 3eba715778
commit 727f91409d
30 changed files with 382 additions and 285 deletions

View File

@ -32,3 +32,7 @@ SYMBOL: type-numbers
: most-negative-fixnum ( -- n )
first-bignum neg ;
M: real >integer
dup most-negative-fixnum most-positive-fixnum between?
[ >fixnum ] [ >bignum ] if ;

View File

@ -14,6 +14,7 @@ $nl
{ $subsection fixnum? }
{ $subsection bignum? }
{ $subsection >fixnum }
{ $subsection >integer }
{ $subsection >bignum }
{ $see-also "prettyprint-numbers" "modular-arithmetic" "bitwise-arithmetic" "integer-functions" "syntax-integers" } ;

View File

@ -6,6 +6,7 @@ IN: math.integers.private
M: integer numerator ;
M: integer denominator drop 1 ;
M: integer >integer ;
M: fixnum >fixnum ;
M: fixnum >bignum fixnum>bignum ;

View File

@ -5,6 +5,7 @@ IN: math
GENERIC: >fixnum ( x -- y ) foldable
GENERIC: >bignum ( x -- y ) foldable
GENERIC: >integer ( x -- y ) foldable
GENERIC: >float ( x -- y ) foldable
MATH: number= ( x y -- ? ) foldable

View File

@ -133,12 +133,22 @@ PRIVATE>
: yield ( -- ) [ resume ] "yield" suspend drop ;
: nap ( ms/f -- ? )
[ >fixnum millis + [ schedule-sleep ] curry "sleep" ]
[ [ drop ] "interrupt" ] if*
suspend ;
GENERIC: nap-until ( time -- ? )
: sleep ( ms -- )
M: integer nap-until [ schedule-sleep ] curry "sleep" suspend ;
M: f nap-until drop [ drop ] "interrupt" suspend ;
GENERIC: nap ( time -- ? )
M: real nap millis + >integer nap-until ;
M: f nap nap-until ;
: sleep-until ( time -- )
nap-until [ "Sleep interrupted" throw ] when ;
: sleep ( time -- )
nap [ "Sleep interrupted" throw ] when ;
: interrupt ( thread -- )

View File

@ -5,9 +5,13 @@ HELP: alarm
{ $class-description "An alarm. Cancel passed to " { $link cancel-alarm } "." } ;
HELP: add-alarm
{ $values { "time" timestamp } { "frequency" "a " { $link dt } " or " { $link f } } { "quot" quotation } { "alarm" alarm } }
{ $values { "quot" quotation } { "time" timestamp } { "frequency" "a " { $link dt } " or " { $link f } } { "alarm" alarm } }
{ $description "Creates and registers an alarm. If " { $snippet "frequency" } " is " { $link f } ", this will be a one-time alarm, otherwise it will fire with the given frequency. The quotation will be called from the alarm thread." } ;
HELP: later
{ $values { "quot" quotation } { "time" dt } { "alarm" alarm } }
{ $description "Creates and registers an alarm which calls the quotation once at " { $snippet "time" } { $link from-now } "." } ;
HELP: cancel-alarm
{ $values { "alarm" alarm } }
{ $description "Cancels an alarm." }
@ -17,6 +21,7 @@ ARTICLE: "alarms" "Alarms"
"Alarms provide a lightweight way to schedule one-time and recurring tasks without spawning a new thread."
{ $subsection alarm }
{ $subsection add-alarm }
{ $subsection later }
{ $subsection cancel-alarm } ;
ABOUT: "alarms"

View File

@ -4,7 +4,7 @@ USING: arrays calendar combinators generic init kernel math
namespaces sequences heaps boxes threads debugger quotations ;
IN: alarms
TUPLE: alarm time interval quot entry ;
TUPLE: alarm quot time interval entry ;
<PRIVATE
@ -15,11 +15,11 @@ SYMBOL: alarm-thread
alarm-thread get-global interrupt ;
: check-alarm
pick timestamp? [ "Not a timestamp" throw ] unless
over dup dt? swap not or [ "Not a dt" throw ] unless
dup callable? [ "Not a quotation" throw ] unless ; inline
dup dt? over not or [ "Not a dt" throw ] unless
over timestamp? [ "Not a timestamp" throw ] unless
pick callable? [ "Not a quotation" throw ] unless ; inline
: <alarm> ( time delay quot -- alarm )
: <alarm> ( quot time frequency -- alarm )
check-alarm <box> alarm construct-boa ;
: register-alarm ( alarm -- )
@ -76,8 +76,11 @@ SYMBOL: alarm-thread
PRIVATE>
: add-alarm ( time frequency quot -- alarm )
: add-alarm ( quot time frequency -- alarm )
<alarm> [ register-alarm ] keep ;
: later ( quot dt -- alarm )
from-now f add-alarm ;
: cancel-alarm ( alarm -- )
alarm-entry box> alarms get-global heap-delete ;

View File

@ -5,7 +5,7 @@ USING: arrays hashtables io io.streams.string kernel math
math.vectors math.functions math.parser namespaces sequences
strings tuples system debugger combinators vocabs.loader
calendar.backend structs alien.c-types math.vectors
math.ranges shuffle ;
shuffle threads ;
IN: calendar
TUPLE: timestamp year month day hour minute second gmt-offset ;
@ -473,6 +473,10 @@ M: timestamp year. ( timestamp -- )
: seconds-since-midnight ( timestamp -- x )
dup beginning-of-day timestamp- ;
M: timestamp nap-until timestamp>millis nap-until ;
M: dt nap from-now nap-until ;
{
{ [ unix? ] [ "calendar.unix" ] }
{ [ windows? ] [ "calendar.windows" ] }

View File

@ -1,6 +1,6 @@
IN: temporary
USING: concurrency.combinators tools.test random kernel math
concurrency.messaging threads sequences ;
concurrency.mailboxes threads sequences ;
[ [ drop ] parallel-each ] must-infer
[ [ ] parallel-map ] must-infer

View File

@ -1,14 +1,27 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists threads kernel arrays sequences ;
USING: dlists dlists.private threads kernel arrays sequences
alarms ;
IN: concurrency.conditions
: notify-1 ( dlist -- )
dup dlist-empty?
[ drop ] [ pop-back second resume-now ] if ;
dup dlist-empty? [ drop ] [ pop-back resume-now ] if ;
: notify-all ( dlist -- )
[ second resume-now ] dlist-slurp yield ;
[ resume-now ] dlist-slurp yield ;
: queue-timeout ( queue timeout -- alarm )
#! Add an alarm which removes the current thread from the
#! queue, and resumes it, passing it a value of t.
>r self over push-front* [
tuck delete-node
dlist-node-obj t swap resume-with
] 2curry r> later ;
: wait ( queue timeout status -- )
>r [ 2array swap push-front ] r> suspend 3drop ; inline
over [
>r queue-timeout [ drop ] r> suspend
[ "Timeout" throw ] [ cancel-alarm ] if
] [
>r drop [ push-front ] curry r> suspend drop
] if ;

View File

@ -1,6 +1,6 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.promises concurrency.messaging kernel arrays
USING: concurrency.promises concurrency.mailboxes kernel arrays
continuations ;
IN: concurrency.futures
@ -11,7 +11,7 @@ IN: concurrency.futures
] keep ; inline
: ?future-timeout ( future timeout -- value )
?promise-timeout ;
?promise-timeout ?linked ;
: ?future ( future -- value )
?promise ;
?promise ?linked ;

View File

@ -1,6 +1,7 @@
IN: temporary
USING: tools.test concurrency.locks concurrency.count-downs
locals kernel threads sequences ;
concurrency.messaging concurrency.mailboxes locals kernel
threads sequences calendar ;
:: lock-test-0 | |
[let | v [ V{ } clone ]
@ -32,7 +33,7 @@ locals kernel threads sequences ;
c [ 2 <count-down> ] |
[
l f [
l [
yield
1 v push
yield
@ -42,7 +43,7 @@ locals kernel threads sequences ;
] "Lock test 1" spawn drop
[
l f [
l [
yield
3 v push
yield
@ -59,8 +60,8 @@ locals kernel threads sequences ;
[ V{ 1 2 3 4 } ] [ lock-test-1 ] unit-test
[ 3 ] [
<reentrant-lock> dup f [
f [
<reentrant-lock> dup [
[
3
] with-lock
] with-lock
@ -68,15 +69,15 @@ locals kernel threads sequences ;
[ ] [ <rw-lock> drop ] unit-test
[ ] [ <rw-lock> f [ ] with-read-lock ] unit-test
[ ] [ <rw-lock> [ ] with-read-lock ] unit-test
[ ] [ <rw-lock> dup f [ f [ ] with-read-lock ] with-read-lock ] unit-test
[ ] [ <rw-lock> dup [ [ ] with-read-lock ] with-read-lock ] unit-test
[ ] [ <rw-lock> f [ ] with-write-lock ] unit-test
[ ] [ <rw-lock> [ ] with-write-lock ] unit-test
[ ] [ <rw-lock> dup f [ f [ ] with-write-lock ] with-write-lock ] unit-test
[ ] [ <rw-lock> dup [ [ ] with-write-lock ] with-write-lock ] unit-test
[ ] [ <rw-lock> dup f [ f [ ] with-read-lock ] with-write-lock ] unit-test
[ ] [ <rw-lock> dup [ [ ] with-read-lock ] with-write-lock ] unit-test
:: rw-lock-test-1 | |
[let | l [ <rw-lock> ]
@ -86,7 +87,7 @@ locals kernel threads sequences ;
v [ V{ } clone ] |
[
l f [
l [
1 v push
c count-down
yield
@ -97,7 +98,7 @@ locals kernel threads sequences ;
[
c await
l f [
l [
4 v push
1000 sleep
5 v push
@ -107,7 +108,7 @@ locals kernel threads sequences ;
[
c await
l f [
l [
2 v push
c' count-down
] with-read-lock
@ -116,7 +117,7 @@ locals kernel threads sequences ;
[
c' await
l f [
l [
6 v push
] with-write-lock
c'' count-down
@ -135,7 +136,7 @@ locals kernel threads sequences ;
v [ V{ } clone ] |
[
l f [
l [
1 v push
c count-down
1000 sleep
@ -146,7 +147,7 @@ locals kernel threads sequences ;
[
c await
l f [
l [
3 v push
] with-read-lock
c' count-down
@ -157,3 +158,21 @@ locals kernel threads sequences ;
] ;
[ V{ 1 2 3 } ] [ rw-lock-test-2 ] unit-test
! Test lock timeouts
:: lock-timeout-test | |
[let | l [ <lock> ] |
[
l [ 1 seconds sleep ] with-lock
] "Lock holder" spawn drop
[
l 1/10 seconds [ ] with-lock-timeout
] "Lock timeout-er" spawn-linked drop
receive
] ;
[ lock-timeout-test ] [
linked-thread thread-name "Lock timeout-er" =
] must-fail-with

View File

@ -25,15 +25,15 @@ TUPLE: lock threads owner reentrant? ;
lock-threads notify-1 ;
: do-lock ( lock timeout quot acquire release -- )
>r swap compose pick >r 2curry r> r> curry [ ] cleanup ;
inline
>r >r pick rot r> call ! use up timeout acquire
swap r> curry [ ] cleanup ; inline
: (with-lock) ( lock timeout quot -- )
[ acquire-lock ] [ release-lock ] do-lock ; inline
PRIVATE>
: with-lock ( lock timeout quot -- )
: with-lock-timeout ( lock timeout quot -- )
pick lock-reentrant? [
pick lock-owner self eq? [
2nip call
@ -44,6 +44,9 @@ PRIVATE>
(with-lock)
] if ; inline
: with-lock ( lock quot -- )
f swap with-lock-timeout ; inline
! Many-reader/single-writer locks
TUPLE: rw-lock readers writers reader# writer ;
@ -79,12 +82,18 @@ TUPLE: rw-lock readers writers reader# writer ;
PRIVATE>
: with-read-lock ( lock timeout quot -- )
: with-read-lock-timeout ( lock timeout quot -- )
[
[ acquire-read-lock ] [ release-read-lock ] do-lock
] do-reentrant-rw-lock ; inline
: with-write-lock ( lock timeout quot -- )
: with-read-lock ( lock quot -- )
f swap with-read-lock-timeout ; inline
: with-write-lock-timeout ( lock timeout quot -- )
[
[ acquire-write-lock ] [ release-write-lock ] do-lock
] do-reentrant-rw-lock ; inline
: with-write-lock ( lock quot -- )
f swap with-write-lock-timeout ; inline

View File

@ -0,0 +1,75 @@
USING: help.markup help.syntax kernel arrays ;
IN: concurrency.mailboxes
HELP: <mailbox>
{ $values { "mailbox" mailbox } }
{ $description "A mailbox is an object that can be used for safe thread communication. Items can be put in the mailbox and retrieved in a FIFO order. If the mailbox is empty when a get operation is performed then the thread will block until another thread places something in the mailbox. If multiple threads are waiting on the same mailbox, only one of the waiting threads will be unblocked to thread the get operation." } ;
HELP: mailbox-empty?
{ $values { "mailbox" mailbox }
{ "bool" "a boolean" }
}
{ $description "Return true if the mailbox is empty." } ;
HELP: mailbox-put
{ $values { "obj" object }
{ "mailbox" mailbox }
}
{ $description "Put the object into the mailbox. Any threads that have a blocking get on the mailbox are resumed. Only one of those threads will successfully get the object, the rest will immediately block waiting for the next item in the mailbox." } ;
HELP: block-unless-pred
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
{ "mailbox" mailbox }
{ "timeout" "a timeout in milliseconds, or " { $link f } }
}
{ $description "Block the thread if there are no items in the mailbox that return true when the predicate is called with the item on the stack." } ;
HELP: block-if-empty
{ $values { "mailbox" mailbox }
{ "timeout" "a timeout in milliseconds, or " { $link f } }
}
{ $description "Block the thread if the mailbox is empty." } ;
HELP: mailbox-get
{ $values { "mailbox" mailbox }
{ "obj" object }
}
{ $description "Get the first item put into the mailbox. If it is empty the thread blocks until an item is put into it. The thread then resumes, leaving the item on the stack." } ;
HELP: mailbox-get-all
{ $values { "mailbox" mailbox }
{ "array" array }
}
{ $description "Blocks the thread if the mailbox is empty, otherwise removes all objects in the mailbox and returns an array containing the objects." } ;
HELP: while-mailbox-empty
{ $values { "mailbox" mailbox }
{ "quot" "a quotation with stack effect " { $snippet "( -- )" } }
}
{ $description "Repeatedly call the quotation while there are no items in the mailbox." } ;
HELP: mailbox-get?
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
{ "mailbox" mailbox }
{ "obj" object }
}
{ $description "Get the first item in the mailbox which satisfies the predicate. 'pred' will be called repeatedly for each item in the mailbox. When 'pred' returns true that item will be returned. If nothing in the mailbox satisfies the predicate then the thread will block until something does." } ;
ARTICLE: "concurrency.mailboxes" "Mailboxes"
"A " { $emphasis "mailbox" } " is a first-in-first-out queue where the operation of removing an element blocks if the queue is empty, instead of throwing an error."
{ $subsection mailbox }
{ $subsection <mailbox> }
"Removing the first element:"
{ $subsection mailbox-get }
{ $subsection mailbox-get-timeout }
"Removing the first element matching a predicate:"
{ $subsection mailbox-get? }
{ $subsection mailbox-get-timeout? }
"Emptying out a mailbox:"
{ $subsection mailbox-get-all }
"Adding an element:"
{ $subsection mailbox-put }
"Testing if a mailbox is empty:"
{ $subsection mailbox-empty? }
{ $subsection while-mailbox-empty } ;

View File

@ -0,0 +1,40 @@
IN: temporary
USING: concurrency.mailboxes vectors sequences threads
tools.test math kernel strings ;
[ V{ 1 2 3 } ] [
0 <vector>
<mailbox>
[ mailbox-get swap push ] in-thread
[ mailbox-get swap push ] in-thread
[ mailbox-get swap push ] in-thread
1 over mailbox-put
2 over mailbox-put
3 swap mailbox-put
] unit-test
[ V{ 1 2 3 } ] [
0 <vector>
<mailbox>
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
1 over mailbox-put
2 over mailbox-put
3 swap mailbox-put
] unit-test
[ V{ 1 "junk" 3 "junk2" } [ 456 ] ] [
0 <vector>
<mailbox>
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ string? ] swap mailbox-get? swap push ] in-thread
[ [ string? ] swap mailbox-get? swap push ] in-thread
1 over mailbox-put
"junk" over mailbox-put
[ 456 ] over mailbox-put
3 over mailbox-put
"junk2" over mailbox-put
mailbox-get
] unit-test

View File

@ -0,0 +1,76 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
IN: concurrency.mailboxes
USING: dlists threads sequences continuations
namespaces random math quotations words kernel arrays assocs
init system concurrency.conditions ;
TUPLE: mailbox threads data ;
: <mailbox> ( -- mailbox )
<dlist> <dlist> mailbox construct-boa ;
: mailbox-empty? ( mailbox -- bool )
mailbox-data dlist-empty? ;
: mailbox-put ( obj mailbox -- )
[ mailbox-data push-front ] keep
mailbox-threads notify-all ;
: block-unless-pred ( pred mailbox timeout -- )
2over mailbox-data dlist-contains? [
3drop
] [
2dup >r mailbox-threads r> "mailbox" wait
block-unless-pred
] if ; inline
: block-if-empty ( mailbox timeout -- mailbox )
over mailbox-empty? [
2dup >r mailbox-threads r> "mailbox" wait
block-if-empty
] [
drop
] if ;
: mailbox-peek ( mailbox -- obj )
mailbox-data peek-back ;
: mailbox-get-timeout ( mailbox timeout -- obj )
block-if-empty mailbox-data pop-back ;
: mailbox-get ( mailbox -- obj )
f mailbox-get-timeout ;
: mailbox-get-all-timeout ( mailbox timeout -- array )
block-if-empty
[ dup mailbox-empty? ]
[ dup mailbox-data pop-back ]
[ ] unfold nip ;
: mailbox-get-all ( mailbox -- array )
f mailbox-get-all-timeout ;
: while-mailbox-empty ( mailbox quot -- )
over mailbox-empty? [
dup >r swap slip r> while-mailbox-empty
] [
2drop
] if ; inline
: mailbox-get-timeout? ( pred mailbox timeout -- obj )
[ block-unless-pred ] 3keep drop
mailbox-data delete-node-if ; inline
: mailbox-get? ( pred mailbox -- obj )
f mailbox-get-timeout? ; inline
TUPLE: linked error thread ;
C: <linked> linked
: ?linked dup linked? [ rethrow ] when ;
: spawn-linked-to ( quot name mailbox -- thread )
[ >r <linked> r> mailbox-put ] curry <thread>
[ (spawn) ] keep ;

View File

@ -4,70 +4,6 @@ USING: help.syntax help.markup concurrency.messaging.private
threads kernel arrays quotations ;
IN: concurrency.messaging
HELP: <mailbox>
{ $values { "mailbox" mailbox }
}
{ $description "A mailbox is an object that can be used for safe thread communication. Items can be put in the mailbox and retrieved in a FIFO order. If the mailbox is empty when a get operation is performed then the thread will block until another thread places something in the mailbox. If multiple threads are waiting on the same mailbox, only one of the waiting threads will be unblocked to thread the get operation." }
{ $see-also mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
HELP: mailbox-empty?
{ $values { "mailbox" mailbox }
{ "bool" "a boolean" }
}
{ $description "Return true if the mailbox is empty." }
{ $see-also <mailbox> mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
HELP: mailbox-put
{ $values { "obj" object }
{ "mailbox" mailbox }
}
{ $description "Put the object into the mailbox. Any threads that have a blocking get on the mailbox are resumed. Only one of those threads will successfully get the object, the rest will immediately block waiting for the next item in the mailbox." }
{ $see-also <mailbox> mailbox-empty? mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
HELP: block-unless-pred
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
{ "mailbox" mailbox }
{ "timeout" "a timeout in milliseconds, or " { $link f } }
}
{ $description "Block the thread if there are no items in the mailbox that return true when the predicate is called with the item on the stack." }
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
HELP: block-if-empty
{ $values { "mailbox" mailbox }
{ "timeout" "a timeout in milliseconds, or " { $link f } }
}
{ $description "Block the thread if the mailbox is empty." }
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty mailbox-get? } ;
HELP: mailbox-get
{ $values { "mailbox" mailbox }
{ "obj" object }
}
{ $description "Get the first item put into the mailbox. If it is empty the thread blocks until an item is put into it. The thread then resumes, leaving the item on the stack." }
{ $see-also <mailbox> mailbox-empty? mailbox-put while-mailbox-empty mailbox-get-all mailbox-get? } ;
HELP: mailbox-get-all
{ $values { "mailbox" mailbox }
{ "array" array }
}
{ $description "Blocks the thread if the mailbox is empty, otherwise removes all objects in the mailbox and returns an array containing the objects." }
{ $see-also <mailbox> mailbox-empty? mailbox-put while-mailbox-empty mailbox-get-all mailbox-get? } ;
HELP: while-mailbox-empty
{ $values { "mailbox" mailbox }
{ "quot" "a quotation with stack effect " { $snippet "( -- )" } }
}
{ $description "Repeatedly call the quotation while there are no items in the mailbox." }
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all mailbox-get? } ;
HELP: mailbox-get?
{ $values { "pred" "a quotation with stack effect " { $snippet "( X -- bool )" } }
{ "mailbox" mailbox }
{ "obj" object }
}
{ $description "Get the first item in the mailbox which satisfies the predicate. 'pred' will be called repeatedly for each item in the mailbox. When 'pred' returns true that item will be returned. If nothing in the mailbox satisfies the predicate then the thread will block until something does." }
{ $see-also <mailbox> mailbox-empty? mailbox-put mailbox-get mailbox-get-all while-mailbox-empty } ;
HELP: send
{ $values { "message" object }
{ "thread" "a thread object" }
@ -95,8 +31,8 @@ HELP: spawn-linked
{ $description "Start a thread which runs the given quotation. If that quotation throws an error which is not caught then the error will get propagated to the thread that spawned it. This can be used to set up 'supervisor' threades that restart child threades that crash due to uncaught errors.\n" }
{ $see-also spawn } ;
ARTICLE: { "concurrency" "mailboxes" } "Mailboxes"
"Each thread has an associated message queue. Other threads can place items on this queue by sending the thread a message. A thread can check its queue for messages, blocking if none are pending, and thread them as they are queued."
ARTICLE: { "concurrency" "messaging" } "Mailboxes"
"Each thread has an associated mailbox. Other threads can place items on this queue by sending the thread a message. A thread can check its mailbox for messages, blocking if none are pending, and thread them as they are queued."
$nl
"The messages that are sent from thread to thread are any Factor value. Factor tuples are ideal for this sort of thing as you can send a tuple to a thread and the generic word dispatch mechanism can be used to perform actions depending on what the type of the tuple is."
$nl
@ -104,14 +40,9 @@ $nl
{ $subsection send }
"A thread can get a message from its queue:"
{ $subsection receive }
{ $subsection receive }
{ $subsection receive-timeout }
{ $subsection receive-if }
"Mailboxes can be created and used directly:"
{ $subsection mailbox }
{ $subsection <mailbox> }
{ $subsection mailbox-get }
{ $subsection mailbox-put }
{ $subsection mailbox-empty? } ;
{ $subsection receive-if-timeout } ;
ARTICLE: { "concurrency" "synchronous-sends" } "Synchronous sends"
"The " { $link send } " word sends a message asynchronously, and the sending thread continues immediately. It is also possible to send a message to a thread and block until a response is received:"
@ -133,8 +64,6 @@ ARTICLE: { "concurrency" "exceptions" } "Linked exceptions"
{ $code "[ 1 0 / \"This will not print\" print ] spawn" }
"Processes can be linked so that a parent thread can receive the exception that caused the child thread to terminate. In this way 'supervisor' threades can be created that are notified when child threades terminate and possibly restart them."
{ $subsection spawn-linked }
"A more flexible version of the above deposits the error in an arbitary mailbox:"
{ $subsection spawn-linked-to }
"This will create a unidirectional link, such that if an uncaught exception causes the child to terminate, the parent thread can catch it:"
{ $code "["
" [ 1 0 / \"This will not print\" print ] spawn-linked drop"
@ -148,7 +77,7 @@ $nl
"A concurrency oriented program is one in which multiple threades run simultaneously in a single Factor image or across multiple running Factor instances. The threades can communicate with each other by asynchronous message sends."
$nl
"Although threades can share data via Factor's mutable data structures it is not recommended to mix shared state with message passing as it can lead to confusing code."
{ $subsection { "concurrency" "mailboxes" } }
{ $subsection { "concurrency" "messaging" } }
{ $subsection { "concurrency" "synchronous-sends" } }
{ $subsection { "concurrency" "exceptions" } } ;

View File

@ -3,48 +3,10 @@
!
USING: kernel threads vectors arrays sequences
namespaces tools.test continuations dlists strings math words
match quotations concurrency.messaging ;
match quotations concurrency.messaging concurrency.mailboxes ;
IN: temporary
[ ] [ mailbox mailbox-data dlist-delete-all ] unit-test
[ V{ 1 2 3 } ] [
0 <vector>
<mailbox>
[ mailbox-get swap push ] in-thread
[ mailbox-get swap push ] in-thread
[ mailbox-get swap push ] in-thread
1 over mailbox-put
2 over mailbox-put
3 swap mailbox-put
] unit-test
[ V{ 1 2 3 } ] [
0 <vector>
<mailbox>
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
1 over mailbox-put
2 over mailbox-put
3 swap mailbox-put
] unit-test
[ V{ 1 "junk" 3 "junk2" } [ 456 ] ] [
0 <vector>
<mailbox>
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ integer? ] swap mailbox-get? swap push ] in-thread
[ [ string? ] swap mailbox-get? swap push ] in-thread
[ [ string? ] swap mailbox-get? swap push ] in-thread
1 over mailbox-put
"junk" over mailbox-put
[ 456 ] over mailbox-put
3 over mailbox-put
"junk2" over mailbox-put
mailbox-get
] unit-test
[ ] [ my-mailbox mailbox-data dlist-delete-all ] unit-test
[ "received" ] [
[

View File

@ -1,80 +1,11 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
!
! Concurrency library for Factor based on Erlang/Termite style
! Concurrency library for Factor, based on Erlang/Termite style
! concurrency.
USING: kernel threads concurrency.mailboxes continuations
namespaces assocs random ;
IN: concurrency.messaging
USING: dlists threads sequences continuations
namespaces random math quotations words kernel arrays assocs
init system concurrency.conditions ;
TUPLE: mailbox threads data ;
: <mailbox> ( -- mailbox )
<dlist> <dlist> \ mailbox construct-boa ;
: mailbox-empty? ( mailbox -- bool )
mailbox-data dlist-empty? ;
: mailbox-put ( obj mailbox -- )
[ mailbox-data push-front ] keep
mailbox-threads notify-all ;
<PRIVATE
: block-unless-pred ( pred mailbox timeout -- )
2over mailbox-data dlist-contains? [
3drop
] [
2dup >r mailbox-threads r> "mailbox" wait
block-unless-pred
] if ; inline
: block-if-empty ( mailbox timeout -- mailbox )
over mailbox-empty? [
2dup >r mailbox-threads r> "mailbox" wait
block-if-empty
] [
drop
] if ;
PRIVATE>
: mailbox-peek ( mailbox -- obj )
mailbox-data peek-back ;
: mailbox-get-timeout ( mailbox timeout -- obj )
block-if-empty mailbox-data pop-back ;
: mailbox-get ( mailbox -- obj )
f mailbox-get-timeout ;
: mailbox-get-all-timeout ( mailbox timeout -- array )
block-if-empty
[ dup mailbox-empty? ]
[ dup mailbox-data pop-back ]
[ ] unfold nip ;
: mailbox-get-all ( mailbox -- array )
f mailbox-get-all-timeout ;
: while-mailbox-empty ( mailbox quot -- )
over mailbox-empty? [
dup >r swap slip r> while-mailbox-empty
] [
2drop
] if ; inline
: mailbox-timeout-get? ( pred mailbox timeout -- obj )
[ block-unless-pred ] 3keep drop
mailbox-data delete-node-if ; inline
: mailbox-get? ( pred mailbox -- obj )
f mailbox-timeout-get? ; inline
TUPLE: linked error thread ;
C: <linked> linked
GENERIC: send ( message process -- )
@ -86,25 +17,25 @@ GENERIC: send ( message process -- )
M: thread send ( message thread -- )
check-registered mailbox-of mailbox-put ;
: ?linked dup linked? [ rethrow ] when ;
: mailbox self mailbox-of ;
: my-mailbox self mailbox-of ;
: receive ( -- message )
mailbox mailbox-get ?linked ;
my-mailbox mailbox-get ?linked ;
: receive-timeout ( timeout -- message )
my-mailbox swap mailbox-get-timeout ?linked ;
: receive-if ( pred -- message )
mailbox mailbox-get? ?linked ; inline
my-mailbox mailbox-get? ?linked ; inline
: receive-if-timeout ( pred timeout -- message )
my-mailbox swap mailbox-get-timeout? ?linked ; inline
: rethrow-linked ( error process supervisor -- )
>r <linked> r> send ;
: spawn-linked-to ( quot name mailbox -- thread )
[ >r <linked> r> mailbox-put ] curry <thread>
[ (spawn) ] keep ;
: spawn-linked ( quot name -- thread )
mailbox spawn-linked-to ;
my-mailbox spawn-linked-to ;
TUPLE: synchronous data sender tag ;
@ -116,17 +47,18 @@ TUPLE: reply data tag ;
: <reply> ( data synchronous -- reply )
synchronous-tag \ reply construct-boa ;
: synchronous-reply? ( response synchronous -- ? )
over reply?
[ >r reply-tag r> synchronous-tag = ]
[ 2drop f ] if ;
: send-synchronous ( message thread -- reply )
dup self eq? [
"Cannot synchronous send to myself" throw
] [
>r <synchronous> dup r> send [
over reply? [
>r reply-tag r> synchronous-tag =
] [
2drop f
] if
] curry receive-if reply-data
>r <synchronous> dup r> send
[ synchronous-reply? ] curry receive-if
reply-data
] if ;
: reply-synchronous ( message synchronous -- )
@ -139,18 +71,18 @@ TUPLE: reply data tag ;
<PRIVATE
: remote-processes ( -- hash )
\ remote-processes get-global ;
: registered-processes ( -- hash )
\ registered-processes get-global ;
PRIVATE>
: register-process ( name process -- )
swap remote-processes set-at ;
swap registered-processes set-at ;
: unregister-process ( name -- )
remote-processes delete-at ;
registered-processes delete-at ;
: get-process ( name -- process )
dup remote-processes at [ ] [ thread ] ?if ;
dup registered-processes at [ ] [ thread ] ?if ;
\ remote-processes global [ H{ } assoc-like ] change-at
\ registered-processes global [ H{ } assoc-like ] change-at

View File

@ -1,7 +1,6 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.messaging concurrency.messaging.private
kernel ;
USING: concurrency.mailboxes kernel continuations ;
IN: concurrency.promises
TUPLE: promise mailbox ;
@ -20,8 +19,7 @@ TUPLE: promise mailbox ;
] if ;
: ?promise-timeout ( promise timeout -- result )
>r promise-mailbox r> block-if-empty
mailbox-peek ?linked ;
>r promise-mailbox r> block-if-empty mailbox-peek ;
: ?promise ( promise -- result )
f ?promise-timeout ;

View File

@ -1,5 +1,5 @@
IN: concurrency.semaphores
USING: help.markup help.syntax kernel quotations ;
USING: help.markup help.syntax kernel quotations calendar ;
HELP: semaphore
{ $class-description "The class of counting semaphores." } ;
@ -8,14 +8,23 @@ HELP: <semaphore>
{ $values { "n" "a non-negative integer" } { "semaphore" semaphore } }
{ $description "Creates a counting semaphore with the specified initial count." } ;
HELP: acquire-timeout
{ $values { "semaphore" semaphore } { "timeout" "a " { $link dt } " or " { $link f } } { "value" object } }
{ $description "If the semaphore has a non-zero count, decrements it and returns immediately. Otherwise, if the timeout is " { $link f } ", waits indefinitely for the semaphore to be released. If the timeout is not " { $link f } ", waits a certain period of time, and if the semaphore still has not been released, throws an error." }
{ $errors "Throws an error if the timeout expires before the semaphore is released." } ;
HELP: acquire
{ $values { "semaphore" semaphore } { "timeout" "a timeout in milliseconds or " { $link f } } { "value" object } }
{ $description "If the semaphore has a non-zero count, decrements it and returns immediately. Otherwise, if the timeout is " { $link f } ", waits indefinitely for the semaphore to be released. If the timeout is not " { $link f } ", waits up to that number of milliseconds for the semaphore to be released." } ;
{ $values { "semaphore" semaphore } { "value" object } }
{ $description "If the semaphore has a non-zero count, decrements it and returns immediately. Otherwise, waits for it to be released." } ;
HELP: release
{ $values { "semaphore" semaphore } }
{ $description "Increments a semaphore's count. If the count was previously zero, any threads waiting on the semaphore are woken up." } ;
HELP: with-semaphore-timeout
{ $values { "semaphore" semaphore } { "timeout" "a " { $link dt } " or " { $link f } } { "quot" quotation } }
{ $description "Calls the quotation with the semaphore held." } ;
HELP: with-semaphore
{ $values { "semaphore" semaphore } { "quot" quotation } }
{ $description "Calls the quotation with the semaphore held." } ;
@ -38,8 +47,10 @@ $nl
{ $subsection <semaphore> }
"Unlike locks, where acquisition and release are always paired by a combinator, semaphores expose these operations directly and there is no requirement that they be performed in the same thread:"
{ $subsection acquire }
{ $subsection acquire-timeout }
{ $subsection release }
"A combinator which pairs acquisition and release:"
{ $subsection with-semaphore } ;
"Combinators which pair acquisition and release:"
{ $subsection with-semaphore }
{ $subsection with-semaphore-timeout } ;
ABOUT: "concurrency.semaphores"

View File

@ -13,17 +13,21 @@ TUPLE: semaphore count threads ;
: wait-to-acquire ( semaphore timeout -- )
>r semaphore-threads r> "semaphore" wait ;
: acquire ( semaphore timeout -- )
dup semaphore-count zero? [
wait-to-acquire
] [
drop
dup semaphore-count 1- swap set-semaphore-count
] if ;
: acquire-timeout ( semaphore timeout -- )
over semaphore-count zero?
[ dupd wait-to-acquire ] [ drop ] if
dup semaphore-count 1- swap set-semaphore-count ;
: acquire ( semaphore -- )
f acquire-timeout ;
: release ( semaphore -- )
dup semaphore-count 1+ over set-semaphore-count
semaphore-threads notify-1 ;
: with-semaphore-timeout ( semaphore timeout quot -- )
pick rot acquire-timeout swap
[ release ] curry [ ] cleanup ; inline
: with-semaphore ( semaphore quot -- )
over acquire [ release ] curry [ ] cleanup ; inline
over acquire swap [ release ] curry [ ] cleanup ; inline

View File

@ -99,6 +99,7 @@ $nl
{ $subsection "concurrency.combinators" }
{ $subsection "concurrency.promises" }
{ $subsection "concurrency.futures" }
{ $subsection "concurrency.mailboxes" }
{ $subsection "concurrency.messaging" }
"Shared-state abstractions:"
{ $subsection "concurrency.locks" }

View File

@ -1,8 +1,8 @@
! Copyright (C) 2005, 2007 Slava Pestov.
! Copyright (C) 2005, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: assocs http kernel math math.parser namespaces sequences
io io.sockets io.streams.string io.files io.timeouts strings
splitting continuations assocs.lib ;
splitting continuations assocs.lib calendar ;
IN: http.client
: parse-host ( url -- host port )

View File

@ -2,7 +2,7 @@
! See http://factorcode.org/license.txt for BSD license.
USING: assocs kernel namespaces io io.timeouts strings splitting
threads http http.server.responders sequences prettyprint
io.server logging ;
io.server logging calendar ;
IN: http.server

View File

@ -17,7 +17,7 @@ GENERIC: timed-out ( obj -- )
M: object timed-out drop ;
: queue-timeout ( obj timeout -- alarm )
from-now f rot [ timed-out ] curry add-alarm ;
>r [ timed-out ] curry r> later ;
: with-timeout ( obj quot -- )
over dup timeout dup [

4
extra/math/ranges/ranges.factor Normal file → Executable file
View File

@ -1,10 +1,6 @@
USING: kernel layouts math namespaces sequences sequences.private ;
IN: math.ranges
: >integer ( n -- i )
dup most-negative-fixnum most-positive-fixnum between?
[ >fixnum ] [ >bignum ] if ;
TUPLE: range from length step ;
: <range> ( from to step -- range )

View File

@ -190,9 +190,9 @@ TUPLE: delay model timeout alarm ;
delay-alarm [ cancel-alarm ] when* ;
: start-delay ( delay -- )
now over delay-timeout +dt f
pick [ f over set-delay-alarm update-delay-model ] curry
add-alarm swap set-delay-alarm ;
dup [ f over set-delay-alarm update-delay-model ] curry
over delay-timeout later
swap set-delay-alarm ;
M: delay model-changed nip dup cancel-delay start-delay ;

View File

@ -1,4 +1,4 @@
! Copyright (C) 2005, 2007 Slava Pestov.
! Copyright (C) 2005, 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: arrays ui.commands ui.gadgets ui.gadgets.borders
ui.gadgets.labels ui.gadgets.theme
@ -88,6 +88,7 @@ TUPLE: repeat-button ;
repeat-button H{
{ T{ drag } [ button-clicked ] }
{ T{ button-down } [ button-clicked ] }
} set-gestures
: <repeat-button> ( label quot -- button )

View File

@ -114,8 +114,10 @@ SYMBOL: drag-timer
: start-drag-timer ( -- )
hand-buttons get-global empty? [
now 300 milliseconds +dt 100 milliseconds
[ drag-gesture ] add-alarm drag-timer get-global >box
[ drag-gesture ]
300 milliseconds from-now
100 milliseconds
add-alarm drag-timer get-global >box
] when ;
: stop-drag-timer ( -- )