diff --git a/contrib/concurrency/concurrency.factor b/contrib/concurrency/concurrency.factor index 7139551d56..2f0963bc8c 100644 --- a/contrib/concurrency/concurrency.factor +++ b/contrib/concurrency/concurrency.factor @@ -4,7 +4,8 @@ ! Concurrency library for Factor based on Erlang/Termite style ! concurrency. USING: kernel generic threads io namespaces errors words arrays - math sequences hashtables strings vectors dlists serialize ; + math sequences hashtables strings vectors dlists serialize + match ; IN: concurrency #! Debug @@ -175,18 +176,6 @@ M: process send ( message process -- ) [ catch [ rethrow-linked ] when* ] curry [ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn-link) ; -#! A common operation is to send a message to a process containing -#! the sending process so the receiver can send a reply back. A 'tag' -#! is also sent so that the sender can match the reply with the -#! original request. The 'tagged-message' tuple ecapsulates this. -TUPLE: tagged-message data from tag ; - -: >tagged-message< ( tagged-message -- data from tag ) - #! Explode a message tuple. - dup tagged-message-data swap - dup tagged-message-from swap - tagged-message-tag ; - : (recv) ( msg form -- ) #! Process a form with the following format: #! [ pred match-quot ] @@ -215,33 +204,22 @@ TUPLE: tagged-message data from tag ; receive swap [ dupd (recv) ] each drop ; : tag-message ( message -- tagged-message ) - #! Given a message, wrap it with a tagged message. - self random-64 ; - -: tag-match? ( message tag -- bool ) - #! Return true if the message is a tagged message and - #! its tag matches the given tag. - swap dup tagged-message? [ - tagged-message-tag = - ] [ - 2drop f - ] if ; + #! Given a message, wrap it with the sending process and a unique tag. + >r self random-64 r> 3array ; : send-synchronous ( message process -- reply ) - #! Sends a message to the process using the 'message' - #! protocol and waits for a reply to that message. The reply - #! is matched up with the request by generating a message tag - #! which should be sent back with the reply. - >r tag-message [ tagged-message-tag ] keep r> send - unit [ first tag-match? ] curry receive-if tagged-message-data ; + #! Sends a message to the process synchronously. The + #! message will be wrapped to include the process of the sender + #! and a unique tag. After being sent the sending process will + #! block for a reply tagged with the same unique tag. + >r tag-message dup r> send second _ 2array [ match ] curry receive-if second ; -: reply ( tagged-message message -- ) - #! Replies to the tagged-message which should have been a result of a - #! 'send-synchronous' call. It will send 'message' back to the process +: reply ( from tag message -- ) + #! Replies to a message received via a 'send-synchronous' call. + #! It will send 'message' back to the process #! that originally sent the tagged message, and will have the same tag #! as that in 'tagged-message'. - swap >tagged-message< rot drop ! message from tag - swap >r >r self r> r> send ; + 2array swap send ; : forever ( quot -- ) #! Loops forever executing the quotation. @@ -275,46 +253,6 @@ SYMBOL: quit-cc "Exiting process: " write self process-pid print ] curry spawn-link ; -: send-reply ( message pred quot -- ) - #! The intent of this word is to provde an easy way to - #! check the data contained in a message, process it, and - #! return a result to the original sender. - #! Given a message tuple, call 'pred' given the - #! message data from that tuple on the top of the stack. - #! 'pred' should have stack effect ( data -- boolean ). - #! If 'pred' returns true, call 'quot' with the message - #! data from the message tuple on the stack. 'quot' has - #! stack effect ( data -- result ). - #! The result of that call will be sent back to the - #! messages original caller with the same tag as the - #! original message. - >r >r >tagged-message< rot ! from tag data r: quot pred ) - dup r> call [ ! from tag data r: quot - r> call ! from tag result - self ! from tag result self - rot ! from self tag result - swap send - ] [ - r> drop 3drop - ] if ; - -: maybe-send-reply ( message pred quot -- ) - #! Same as !result but if false is returned from - #! quot then nothing is sent back to the caller. - >r >r >tagged-message< rot ! from tag data r: quot pred ) - dup r> call [ ! from tag data r: quot - r> call ! from tag result - [ - self ! from tag result self - rot ! from self tag result - swap send - ] [ - 2drop - ] if* - ] [ - r> drop 3drop - ] if ; - : server-cc ( -- cc | process ) #! Captures the current continuation and returns the value. #! If that CC is called with a process on the stack it will @@ -375,13 +313,13 @@ C: promise ( -- ) ! Experimental code below ! ****************************** : (lazy) ( v -- ) - receive over reply (lazy) ; + receive first2 pick reply (lazy) ; : lazy ( quot -- lazy ) #! Spawn a process that immediately blocks and return it. #! When '?lazy' is called on the returned process, call the quotation #! and return the result. The quotation must have stack effect ( -- X ). - [ receive >r call r> over reply (lazy) ] spawn nip ; + [ receive >r call r> first2 pick reply (lazy) ] spawn nip ; : ?lazy ( lazy -- result ) #! Given a process spawned using 'lazy', evaluate it and return the result. @@ -390,37 +328,28 @@ C: promise ( -- ) ! ****************************** ! Standard Processes ! ****************************** -TUPLE: register-msg name process ; -TUPLE: unregister-msg name ; -TUPLE: get-msg name ; - -PREDICATE: tagged-message (get-msg) ( obj -- ? ) - tagged-message-data get-msg? ; - -: handle-register-process ( register-msg table -- ) - >r [ register-msg-process ] keep register-msg-name r> set-hash ; - -: handle-unregister-process ( unregister-msg table -- ) - >r unregister-msg-name r> remove-hash ; - -: handle-get-process ( get-msg table -- ) - over tagged-message-data get-msg-name swap hash reply ; +SYMBOL: ?from +SYMBOL: ?tag +SYMBOL: ?process +SYMBOL: ?name +SYMBOL: register +SYMBOL: unregister : process-registry ( table -- ) receive { - { [ dup register-msg? ] [ over handle-register-process ] } - { [ dup unregister-msg? ] [ over handle-unregister-process ] } - { [ dup (get-msg)? ] [ over handle-get-process ] } - } cond process-registry ; + { { register ?name ?process } [ ?process get ?name get pick set-hash ] } + { { unregister ?name } [ ?name get over remove-hash ] } + { { ?from ?tag { process ?name } } [ ?name get over hash ?from get ?tag get rot reply ] } + } match-cond process-registry ; : register-process ( name process -- ) - \ process-registry get send ; + [ register , swap , , ] { } make \ process-registry get send ; : unregister-process ( name -- ) - \ process-registry get send ; + [ unregister , , ] { } make \ process-registry get send ; : get-process ( name -- ) - \ process-registry get send-synchronous ; + [ process , , ] { } make \ process-registry get send-synchronous ; [ H{ } clone process-registry ] (spawn) \ process-registry set-global @@ -451,7 +380,7 @@ M: process serialize ( obj -- ) localnode swap process-pid serialize ; : (test-node1) - receive "ack" reply (test-node1) ; + receive first2 "ack" reply (test-node1) ; : test-node1 ( -- ) [ (test-node1) ] spawn