get send-synchronous working across distributed nodes

release
chris.double 2006-08-31 04:37:53 +00:00
parent 429a09f390
commit cc41d2b3ef
1 changed files with 37 additions and 14 deletions

View File

@ -140,10 +140,7 @@ TUPLE: node hostname port ;
: localnode ( -- node )
#! Return the current node
\ localnode get ;
: start-node ( host port -- )
<node> \ localnode set-global ;
#! Processes run in nodes. Each process has a mailbox that is
#! used for receiving messages sent to that process.
TUPLE: process links pid mailbox ;
@ -151,18 +148,22 @@ TUPLE: remote-process node pid ;
GENERIC: send ( message process -- )
: random-64 ( -- id )
#! Generate a random id to use for pids
[ "ID" % 64 [ 9 random-int CHAR: 0 + , ] times ] "" make ;
: make-process ( -- process )
#! Return a process set to run on the local node. A process is
#! similar to a thread but can send and receive messages to and
#! from other processes. It may also be linked to other processes so
#! that it receives a message if that process terminates.
[ ] gensym unparse make-mailbox <process> ;
[ ] random-64 make-mailbox <process> ;
: make-linked-process ( process -- process )
#! Return a process set to run on the local node. That process is
#! linked to the process on the stack. It will receive a message if
#! that process terminates.
unit gensym unparse make-mailbox <process> ;
unit random-64 make-mailbox <process> ;
: self ( -- process )
#! Returns the contents of the 'self-process' variables which
@ -202,10 +203,6 @@ TUPLE: linked-exception error ;
#! ( -- ).
>r self process-mailbox r> while-mailbox-empty ; inline
M: remote-process send ( message process -- )
#! Send the message via the inter-node protocol
"remote-send not implemented" throw ;
M: process send ( message process -- )
#! Send the message to the process by placing it in the
#! processes mailbox.
@ -282,7 +279,7 @@ TUPLE: tagged-message data from tag ;
: tag-message ( message -- tagged-message )
#! Given a message, wrap it with a tagged message.
self gensym <tagged-message> ;
self random-64 <tagged-message> ;
: tag-match? ( message tag -- bool )
#! Return true if the message is a tagged message and
@ -479,8 +476,6 @@ PREDICATE: tagged-message (get-msg) ( obj -- ? )
{ [ dup (get-msg)? ] [ over handle-get-process ] }
} cond process-registry ;
[ H{ } clone process-registry ] (spawn) \ process-registry set-global
: register-process ( name process -- )
<register-msg> \ process-registry get send ;
@ -490,6 +485,8 @@ PREDICATE: tagged-message (get-msg) ( obj -- ? )
: get-process ( name -- )
<get-msg> \ process-registry get send-synchronous ;
[ H{ } clone process-registry ] (spawn) \ process-registry set-global
: handle-node-client ( stream -- )
[ [ deserialize ] with-serialized ] with-stream dup . first2 get-process send ;
@ -500,4 +497,30 @@ PREDICATE: tagged-message (get-msg) ( obj -- ? )
<server> (node-server) ;
: send-to-node ( msg pid host port -- )
<client> [ 2array [ serialize ] with-serialized ] with-stream ;
<client> [ 2array [ serialize ] with-serialized ] with-stream ;
: start-node ( port -- )
[ node-server ] in-thread
"localhost" swap <node> \ localnode set-global ;
M: remote-process send ( message process -- )
#! Send the message via the inter-node protocol
[ remote-process-pid ] keep
remote-process-node
[ node-hostname ] keep
node-port send-to-node ;
M: process serialize ( obj -- )
localnode swap process-pid <remote-process> serialize ;
: (test-node1)
receive "ack" reply (test-node1) ;
: test-node1
9000 start-node
[ (test-node1) ] spawn
"test1" swap register-process ;
: test-node2
localnode [ 9001 start-node ] unless
[ "abcd" "localhost" 9000 <node> "test1" <remote-process> send-synchronous . ] spawn ;