change synchronous concurrency send to use match

chris.double 2006-09-06 13:29:41 +00:00
parent 2e59e8b488
commit d97f261f24
1 changed files with 29 additions and 100 deletions

View File

@ -4,7 +4,8 @@
! Concurrency library for Factor based on Erlang/Termite style
! concurrency.
USING: kernel generic threads io namespaces errors words arrays
math sequences hashtables strings vectors dlists serialize ;
math sequences hashtables strings vectors dlists serialize
match ;
IN: concurrency
#! Debug
@ -175,18 +176,6 @@ M: process send ( message process -- )
[ catch [ rethrow-linked ] when* ] curry
[ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn-link) ;
#! A common operation is to send a message to a process containing
#! the sending process so the receiver can send a reply back. A 'tag'
#! is also sent so that the sender can match the reply with the
#! original request. The 'tagged-message' tuple ecapsulates this.
TUPLE: tagged-message data from tag ;
: >tagged-message< ( tagged-message -- data from tag )
#! Explode a message tuple.
dup tagged-message-data swap
dup tagged-message-from swap
tagged-message-tag ;
: (recv) ( msg form -- )
#! Process a form with the following format:
#! [ pred match-quot ]
@ -215,33 +204,22 @@ TUPLE: tagged-message data from tag ;
receive swap [ dupd (recv) ] each drop ;
: tag-message ( message -- tagged-message )
#! Given a message, wrap it with a tagged message.
self random-64 <tagged-message> ;
: tag-match? ( message tag -- bool )
#! Return true if the message is a tagged message and
#! its tag matches the given tag.
swap dup tagged-message? [
tagged-message-tag =
] [
2drop f
] if ;
#! Given a message, wrap it with the sending process and a unique tag.
>r self random-64 r> 3array ;
: send-synchronous ( message process -- reply )
#! Sends a message to the process using the 'message'
#! protocol and waits for a reply to that message. The reply
#! is matched up with the request by generating a message tag
#! which should be sent back with the reply.
>r tag-message [ tagged-message-tag ] keep r> send
unit [ first tag-match? ] curry receive-if tagged-message-data ;
#! Sends a message to the process synchronously. The
#! message will be wrapped to include the process of the sender
#! and a unique tag. After being sent the sending process will
#! block for a reply tagged with the same unique tag.
>r tag-message dup r> send second _ 2array [ match ] curry receive-if second ;
: reply ( tagged-message message -- )
#! Replies to the tagged-message which should have been a result of a
#! 'send-synchronous' call. It will send 'message' back to the process
: reply ( from tag message -- )
#! Replies to a message received via a 'send-synchronous' call.
#! It will send 'message' back to the process
#! that originally sent the tagged message, and will have the same tag
#! as that in 'tagged-message'.
swap >tagged-message< rot drop ! message from tag
swap >r >r self r> <tagged-message> r> send ;
2array swap send ;
: forever ( quot -- )
#! Loops forever executing the quotation.
@ -275,46 +253,6 @@ SYMBOL: quit-cc
"Exiting process: " write self process-pid print
] curry spawn-link ;
: send-reply ( message pred quot -- )
#! The intent of this word is to provde an easy way to
#! check the data contained in a message, process it, and
#! return a result to the original sender.
#! Given a message tuple, call 'pred' given the
#! message data from that tuple on the top of the stack.
#! 'pred' should have stack effect ( data -- boolean ).
#! If 'pred' returns true, call 'quot' with the message
#! data from the message tuple on the stack. 'quot' has
#! stack effect ( data -- result ).
#! The result of that call will be sent back to the
#! messages original caller with the same tag as the
#! original message.
>r >r >tagged-message< rot ! from tag data r: quot pred )
dup r> call [ ! from tag data r: quot
r> call ! from tag result
self ! from tag result self
rot ! from self tag result
<tagged-message> swap send
] [
r> drop 3drop
] if ;
: maybe-send-reply ( message pred quot -- )
#! Same as !result but if false is returned from
#! quot then nothing is sent back to the caller.
>r >r >tagged-message< rot ! from tag data r: quot pred )
dup r> call [ ! from tag data r: quot
r> call ! from tag result
[
self ! from tag result self
rot ! from self tag result
<tagged-message> swap send
] [
2drop
] if*
] [
r> drop 3drop
] if ;
: server-cc ( -- cc | process )
#! Captures the current continuation and returns the value.
#! If that CC is called with a process on the stack it will
@ -375,13 +313,13 @@ C: promise ( -- <promise> )
! Experimental code below
! ******************************
: (lazy) ( v -- )
receive over reply (lazy) ;
receive first2 pick reply (lazy) ;
: lazy ( quot -- lazy )
#! Spawn a process that immediately blocks and return it.
#! When '?lazy' is called on the returned process, call the quotation
#! and return the result. The quotation must have stack effect ( -- X ).
[ receive >r call r> over reply (lazy) ] spawn nip ;
[ receive >r call r> first2 pick reply (lazy) ] spawn nip ;
: ?lazy ( lazy -- result )
#! Given a process spawned using 'lazy', evaluate it and return the result.
@ -390,37 +328,28 @@ C: promise ( -- <promise> )
! ******************************
! Standard Processes
! ******************************
TUPLE: register-msg name process ;
TUPLE: unregister-msg name ;
TUPLE: get-msg name ;
PREDICATE: tagged-message (get-msg) ( obj -- ? )
tagged-message-data get-msg? ;
: handle-register-process ( register-msg table -- )
>r [ register-msg-process ] keep register-msg-name r> set-hash ;
: handle-unregister-process ( unregister-msg table -- )
>r unregister-msg-name r> remove-hash ;
: handle-get-process ( get-msg table -- )
over tagged-message-data get-msg-name swap hash reply ;
SYMBOL: ?from
SYMBOL: ?tag
SYMBOL: ?process
SYMBOL: ?name
SYMBOL: register
SYMBOL: unregister
: process-registry ( table -- )
receive {
{ [ dup register-msg? ] [ over handle-register-process ] }
{ [ dup unregister-msg? ] [ over handle-unregister-process ] }
{ [ dup (get-msg)? ] [ over handle-get-process ] }
} cond process-registry ;
{ { register ?name ?process } [ ?process get ?name get pick set-hash ] }
{ { unregister ?name } [ ?name get over remove-hash ] }
{ { ?from ?tag { process ?name } } [ ?name get over hash ?from get ?tag get rot reply ] }
} match-cond process-registry ;
: register-process ( name process -- )
<register-msg> \ process-registry get send ;
[ register , swap , , ] { } make \ process-registry get send ;
: unregister-process ( name -- )
<unregister-msg> \ process-registry get send ;
[ unregister , , ] { } make \ process-registry get send ;
: get-process ( name -- )
<get-msg> \ process-registry get send-synchronous ;
[ process , , ] { } make \ process-registry get send-synchronous ;
[ H{ } clone process-registry ] (spawn) \ process-registry set-global
@ -451,7 +380,7 @@ M: process serialize ( obj -- )
localnode swap process-pid <remote-process> serialize ;
: (test-node1)
receive "ack" reply (test-node1) ;
receive first2 "ack" reply (test-node1) ;
: test-node1 ( -- )
[ (test-node1) ] spawn