diff --git a/contrib/concurrency/concurrency.factor b/contrib/concurrency/concurrency.factor index 2f0963bc8c..771583ba3a 100644 --- a/contrib/concurrency/concurrency.factor +++ b/contrib/concurrency/concurrency.factor @@ -203,6 +203,8 @@ M: process send ( message process -- ) #! may be run against the message. receive swap [ dupd (recv) ] each drop ; +MATCH-VARS: ?from ?tag ; + : tag-message ( message -- tagged-message ) #! Given a message, wrap it with the sending process and a unique tag. >r self random-64 r> 3array ; @@ -214,13 +216,6 @@ M: process send ( message process -- ) #! block for a reply tagged with the same unique tag. >r tag-message dup r> send second _ 2array [ match ] curry receive-if second ; -: reply ( from tag message -- ) - #! Replies to a message received via a 'send-synchronous' call. - #! It will send 'message' back to the process - #! that originally sent the tagged message, and will have the same tag - #! as that in 'tagged-message'. - 2array swap send ; - : forever ( quot -- ) #! Loops forever executing the quotation. dup >r call r> forever ; @@ -313,13 +308,19 @@ C: promise ( -- ) ! Experimental code below ! ****************************** : (lazy) ( v -- ) - receive first2 pick reply (lazy) ; + receive { + { { ?from ?tag _ } [ ?tag over 2array ?from send (lazy) ] } + } match-cond ; : lazy ( quot -- lazy ) #! Spawn a process that immediately blocks and return it. #! When '?lazy' is called on the returned process, call the quotation #! and return the result. The quotation must have stack effect ( -- X ). - [ receive >r call r> first2 pick reply (lazy) ] spawn nip ; + [ + receive { + { { ?from ?tag _ } [ call ?tag over 2array ?from send (lazy) ] } + } match-cond + ] spawn nip ; : ?lazy ( lazy -- result ) #! Given a process spawned using 'lazy', evaluate it and return the result. @@ -328,18 +329,15 @@ C: promise ( -- ) ! ****************************** ! Standard Processes ! ****************************** -SYMBOL: ?from -SYMBOL: ?tag -SYMBOL: ?process -SYMBOL: ?name +MATCH-VARS: ?process ?name ; SYMBOL: register SYMBOL: unregister : process-registry ( table -- ) receive { - { { register ?name ?process } [ ?process get ?name get pick set-hash ] } - { { unregister ?name } [ ?name get over remove-hash ] } - { { ?from ?tag { process ?name } } [ ?name get over hash ?from get ?tag get rot reply ] } + { { register ?name ?process } [ ?process ?name pick set-hash ] } + { { unregister ?name } [ ?name over remove-hash ] } + { { ?from ?tag { process ?name } } [ ?tag ?name pick hash 2array ?from send ] } } match-cond process-registry ; : register-process ( name process -- ) @@ -380,7 +378,9 @@ M: process serialize ( obj -- ) localnode swap process-pid serialize ; : (test-node1) - receive first2 "ack" reply (test-node1) ; + receive { + { { ?from ?tag _ } [ ?tag "ack" 2array ?from send (test-node1) ] } + } match-cond ; : test-node1 ( -- ) [ (test-node1) ] spawn