Merge branch 'for-slava' of git://github.com/x6j8x/factor into x6

db4
Doug Coleman 2009-08-30 10:34:00 -05:00
commit b938123d94
3 changed files with 103 additions and 17 deletions

View File

@ -3,18 +3,34 @@
USING: kernel threads vectors arrays sequences namespaces make USING: kernel threads vectors arrays sequences namespaces make
tools.test continuations deques strings math words match tools.test continuations deques strings math words match
quotations concurrency.messaging concurrency.mailboxes quotations concurrency.messaging concurrency.mailboxes
concurrency.count-downs accessors ; concurrency.count-downs concurrency.conditions accessors calendar ;
IN: concurrency.messaging.tests IN: concurrency.messaging.tests
[ ] [ my-mailbox data>> clear-deque ] unit-test [ ] [ my-mailbox data>> clear-deque ] unit-test
[ "received" ] [ [ "received" ] [
[ [
receive "received" swap reply-synchronous [ drop "received" ] handle-synchronous
] "Synchronous test" spawn ] "Synchronous test" spawn
"sent" swap send-synchronous "sent" swap send-synchronous
] unit-test ] unit-test
[ "received" ] [
[
[ drop "received" ] handle-synchronous
] "Synchronous test" spawn
[ 100 milliseconds "sent" ] dip send-synchronous-timeout
] unit-test
[
[
100 milliseconds sleep
[ drop "received" ] handle-synchronous
] "Synchronous test" spawn
[ 5 milliseconds "sent" ] dip send-synchronous-timeout
] [ wait-timeout? ] must-fail-with
[ 1 3 2 ] [ [ 1 3 2 ] [
1 self send 1 self send
2 self send 2 self send
@ -64,3 +80,4 @@ SYMBOL: exit
! ] "Bad synchronous send" spawn "t" set ! ] "Bad synchronous send" spawn "t" set
! [ 3 "t" get send-synchronous ] must-fail ! [ 3 "t" get send-synchronous ] must-fail

View File

@ -1,32 +1,82 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: kernel threads concurrency.mailboxes continuations USING: kernel threads concurrency.mailboxes continuations
namespaces assocs accessors summary fry ; namespaces assocs accessors summary fry calendar math sequences ;
IN: concurrency.messaging IN: concurrency.messaging
TUPLE: envelope data sender tag expiry ;
<PRIVATE
: new-envelope ( data class -- envelope )
new swap >>data self >>sender ;
: <envelope> ( data -- envelope )
dup envelope?
[ envelope new-envelope ] unless ;
: expired? ( message -- ? )
dup envelope?
[ expiry>>
[ now (time-) 0 < ]
[ f ] if*
] [ drop f ] if ; inline
: if-expired ( message quot -- message )
[ dup expired? ] dip
'[ drop _ call( -- message ) ] [ ] if ; inline
PRIVATE>
GENERIC: send ( message thread -- ) GENERIC: send ( message thread -- )
GENERIC: send-timeout ( timeout message thread -- )
: mailbox-of ( thread -- mailbox ) : mailbox-of ( thread -- mailbox )
dup mailbox>> [ ] [ dup mailbox>> [ ] [
<mailbox> [ >>mailbox drop ] keep <mailbox> [ >>mailbox drop ] keep
] ?if ; ] ?if ;
M: thread send ( message thread -- ) M: thread send ( message thread -- )
[ <envelope> ] dip
check-registered mailbox-of mailbox-put ; check-registered mailbox-of mailbox-put ;
M: thread send-timeout ( timeout message thread -- )
[ <envelope> swap hence >>expiry ] dip send ;
: my-mailbox ( -- mailbox ) self mailbox-of ; : my-mailbox ( -- mailbox ) self mailbox-of ;
: (receive) ( -- message )
my-mailbox mailbox-get ?linked
[ (receive) ] if-expired ;
: receive ( -- message ) : receive ( -- message )
my-mailbox mailbox-get ?linked ; (receive) data>> ;
: (receive-timeout) ( timeout -- message )
[ my-mailbox ] dip
[ mailbox-get-timeout ?linked ] keep
'[ _ (receive-timeout) ] if-expired ; inline
: receive-timeout ( timeout -- message ) : receive-timeout ( timeout -- message )
[ my-mailbox ] dip mailbox-get-timeout ?linked ; (receive-timeout) data>> ;
: (receive-if) ( pred -- message )
[ my-mailbox ] dip
[ mailbox-get? ?linked ] keep
'[ _ (receive-if) ] if-expired ; inline
: receive-if ( pred -- message ) : receive-if ( pred -- message )
[ my-mailbox ] dip mailbox-get? ?linked ; inline [ data>> ] prepend (receive-if) data>> ; inline
: (receive-if-timeout) ( timeout pred -- message )
[ my-mailbox ] 2dip
[ mailbox-get-timeout? ?linked ] 2keep
'[ _ _ (receive-if-timeout) ] if-expired ; inline
: receive-if-timeout ( timeout pred -- message ) : receive-if-timeout ( timeout pred -- message )
[ my-mailbox ] 2dip mailbox-get-timeout? ?linked ; inline [ data>> ] prepend
(receive-if-timeout) data>> ; inline
: rethrow-linked ( error process supervisor -- ) : rethrow-linked ( error process supervisor -- )
[ <linked-error> ] dip send ; [ <linked-error> ] dip send ;
@ -34,15 +84,17 @@ M: thread send ( message thread -- )
: spawn-linked ( quot name -- thread ) : spawn-linked ( quot name -- thread )
my-mailbox spawn-linked-to ; my-mailbox spawn-linked-to ;
TUPLE: synchronous data sender tag ; TUPLE: synchronous < envelope ;
: <synchronous> ( data -- sync ) : <synchronous> ( data -- sync )
self synchronous counter synchronous boa ; synchronous new-envelope
synchronous counter >>tag ;
TUPLE: reply data tag ; TUPLE: reply < envelope ;
: <reply> ( data synchronous -- reply ) : <reply> ( data synchronous -- reply )
tag>> \ reply boa ; [ reply new-envelope ] dip
tag>> >>tag ;
: synchronous-reply? ( response synchronous -- ? ) : synchronous-reply? ( response synchronous -- ? )
over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ; over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ;
@ -57,15 +109,28 @@ M: cannot-send-synchronous-to-self summary
cannot-send-synchronous-to-self cannot-send-synchronous-to-self
] [ ] [
[ <synchronous> dup ] dip send [ <synchronous> dup ] dip send
'[ _ synchronous-reply? ] receive-if '[ _ synchronous-reply? ] (receive-if) data>>
data>>
] if ; ] if ;
: send-synchronous-timeout ( timeout message thread -- reply )
dup self eq? [
cannot-send-synchronous-to-self
] [
[ <synchronous> 2dup ] dip send-timeout
'[ _ synchronous-reply? ] (receive-if-timeout) data>>
] if ;
<PRIVATE
: reply-synchronous ( message synchronous -- ) : reply-synchronous ( message synchronous -- )
[ <reply> ] keep sender>> send ; dup expired?
[ 2drop ]
[ [ <reply> ] keep sender>> send ] if ;
PRIVATE>
: handle-synchronous ( quot -- ) : handle-synchronous ( quot -- )
receive [ (receive) [
data>> swap call data>> swap call
] keep reply-synchronous ; inline ] keep reply-synchronous ; inline

View File

@ -1,7 +1,8 @@
USING: accessors arrays assocs bson.constants combinators USING: accessors arrays assocs bson.constants combinators
combinators.smart constructors destructors formatting fry hashtables combinators.smart constructors destructors formatting fry hashtables
io io.pools io.sockets kernel linked-assocs math mongodb.connection io io.pools io.sockets kernel linked-assocs math mongodb.connection
mongodb.msg parser prettyprint sequences sets splitting strings mongodb.msg parser prettyprint prettyprint.custom prettyprint.sections
sequences sets splitting strings
tools.continuations uuid memoize locals ; tools.continuations uuid memoize locals ;
IN: mongodb.driver IN: mongodb.driver
@ -32,6 +33,9 @@ CONSTANT: PARTIAL? "partial?"
ERROR: mdb-error msg ; ERROR: mdb-error msg ;
M: mdb-error pprint* ( obj -- )
msg>> text ;
: >pwd-digest ( user password -- digest ) : >pwd-digest ( user password -- digest )
"mongo" swap 3array ":" join md5-checksum ; "mongo" swap 3array ":" join md5-checksum ;