From 7f0f3c3c1ae2eb0e2cf5390586d9a08beca59c68 Mon Sep 17 00:00:00 2001 From: Sascha Matzke Date: Sun, 30 Aug 2009 17:26:23 +0200 Subject: [PATCH] reworked messaging, now always use an envelope around the message to handle expiration correctly in all use-cases --- .../messaging/messaging-tests.factor | 14 ++- basis/concurrency/messaging/messaging.factor | 92 +++++++++++++++---- 2 files changed, 83 insertions(+), 23 deletions(-) diff --git a/basis/concurrency/messaging/messaging-tests.factor b/basis/concurrency/messaging/messaging-tests.factor index 7cbe2b21ff..f3e26f9b5d 100644 --- a/basis/concurrency/messaging/messaging-tests.factor +++ b/basis/concurrency/messaging/messaging-tests.factor @@ -10,21 +10,24 @@ IN: concurrency.messaging.tests [ "received" ] [ [ - receive "received" swap reply-synchronous + [ drop "received" ] handle-synchronous ] "Synchronous test" spawn "sent" swap send-synchronous ] unit-test [ "received" ] [ [ - receive "received" swap reply-synchronous + [ drop "received" ] handle-synchronous ] "Synchronous test" spawn [ 100 milliseconds "sent" ] dip send-synchronous-timeout ] unit-test -[ [ 100 milliseconds sleep - receive "received" swap reply-synchronous ] "Synchronous test" spawn - [ 50 milliseconds "sent" ] dip send-synchronous-timeout +[ + [ + 100 milliseconds sleep + [ drop "received" ] handle-synchronous + ] "Synchronous test" spawn + [ 5 milliseconds "sent" ] dip send-synchronous-timeout ] [ wait-timeout? ] must-fail-with @@ -77,3 +80,4 @@ SYMBOL: exit ! ] "Bad synchronous send" spawn "t" set ! [ 3 "t" get send-synchronous ] must-fail + diff --git a/basis/concurrency/messaging/messaging.factor b/basis/concurrency/messaging/messaging.factor index 8438f7effe..9046604282 100644 --- a/basis/concurrency/messaging/messaging.factor +++ b/basis/concurrency/messaging/messaging.factor @@ -1,32 +1,82 @@ ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. USING: kernel threads concurrency.mailboxes continuations -namespaces assocs accessors summary fry ; +namespaces assocs accessors summary fry calendar math sequences ; IN: concurrency.messaging +TUPLE: envelope data sender tag expiry ; + +>data self >>sender ; + +: ( data -- envelope ) + dup envelope? + [ envelope new-envelope ] unless ; + +: expired? ( message -- ? ) + dup envelope? + [ expiry>> + [ now (time-) 0 < ] + [ f ] if* + ] [ drop f ] if ; inline + +: if-expired ( message quot -- message ) + [ dup expired? ] dip + '[ drop _ call( -- message ) ] [ ] if ; inline + +PRIVATE> + GENERIC: send ( message thread -- ) +GENERIC: send-timeout ( timeout message thread -- ) + : mailbox-of ( thread -- mailbox ) dup mailbox>> [ ] [ [ >>mailbox drop ] keep ] ?if ; M: thread send ( message thread -- ) + [ ] dip check-registered mailbox-of mailbox-put ; +M: thread send-timeout ( timeout message thread -- ) + [ swap hence >>expiry ] dip send ; + : my-mailbox ( -- mailbox ) self mailbox-of ; +: (receive) ( -- message ) + my-mailbox mailbox-get ?linked + [ (receive) ] if-expired ; + : receive ( -- message ) - my-mailbox mailbox-get ?linked ; + (receive) data>> ; + +: (receive-timeout) ( timeout -- message ) + [ my-mailbox ] dip + [ mailbox-get-timeout ?linked ] keep + '[ _ (receive-timeout) ] if-expired ; inline : receive-timeout ( timeout -- message ) - [ my-mailbox ] dip mailbox-get-timeout ?linked ; + (receive-timeout) data>> ; + +: (receive-if) ( pred -- message ) + [ my-mailbox ] dip + [ mailbox-get? ?linked ] keep + '[ _ (receive-if) ] if-expired ; inline : receive-if ( pred -- message ) - [ my-mailbox ] dip mailbox-get? ?linked ; inline + [ data>> ] prepend (receive-if) data>> ; inline + +: (receive-if-timeout) ( timeout pred -- message ) + [ my-mailbox ] 2dip + [ mailbox-get-timeout? ?linked ] 2keep + '[ _ _ (receive-if-timeout) ] if-expired ; inline : receive-if-timeout ( timeout pred -- message ) - [ my-mailbox ] 2dip mailbox-get-timeout? ?linked ; inline + [ data>> ] prepend + (receive-if-timeout) data>> ; inline : rethrow-linked ( error process supervisor -- ) [ ] dip send ; @@ -34,15 +84,17 @@ M: thread send ( message thread -- ) : spawn-linked ( quot name -- thread ) my-mailbox spawn-linked-to ; -TUPLE: synchronous data sender tag ; +TUPLE: synchronous < envelope ; : ( data -- sync ) - self synchronous counter synchronous boa ; + synchronous new-envelope + synchronous counter >>tag ; -TUPLE: reply data tag ; +TUPLE: reply < envelope ; : ( data synchronous -- reply ) - tag>> \ reply boa ; + [ reply new-envelope ] dip + tag>> >>tag ; : synchronous-reply? ( response synchronous -- ? ) over reply? [ [ tag>> ] bi@ = ] [ 2drop f ] if ; @@ -57,24 +109,28 @@ M: cannot-send-synchronous-to-self summary cannot-send-synchronous-to-self ] [ [ dup ] dip send - '[ _ synchronous-reply? ] receive-if - data>> - ] if ; + '[ _ synchronous-reply? ] (receive-if) data>> + ] if ; : send-synchronous-timeout ( timeout message thread -- reply ) dup self eq? [ cannot-send-synchronous-to-self ] [ - [ dup ] dip send - '[ _ synchronous-reply? ] receive-if-timeout - data>> + [ 2dup ] dip send-timeout + '[ _ synchronous-reply? ] (receive-if-timeout) data>> ] if ; - + + ] keep sender>> send ; + dup expired? + [ 2drop ] + [ [ ] keep sender>> send ] if ; + +PRIVATE> : handle-synchronous ( quot -- ) - receive [ + (receive) [ data>> swap call ] keep reply-synchronous ; inline