more work on distributed concurrency

release
chris.double 2006-08-31 03:38:45 +00:00
parent e170d3e85d
commit 429a09f390
2 changed files with 68 additions and 14 deletions

View File

@ -23,8 +23,8 @@
!
! Concurrency library for Factor based on Erlang/Termite style
! concurrency.
USING: kernel generic threads io namespaces errors words
math sequences hashtables strings vectors dlists ;
USING: kernel generic threads io namespaces errors words arrays
math sequences hashtables strings vectors dlists serialize ;
IN: concurrency
#! Debug
@ -146,24 +146,23 @@ TUPLE: node hostname port ;
#! Processes run in nodes. Each process has a mailbox that is
#! used for receiving messages sent to that process.
TUPLE: process node links pid mailbox ;
TUPLE: process links pid mailbox ;
TUPLE: remote-process node pid ;
GENERIC: send ( message process -- )
: local-process? ( process -- boolean )
#! Is the process running on the local node
process-node [ localnode = ] [ t ] if* ;
: make-process ( -- process )
#! Return a process set to run on the local node. A process is
#! similar to a thread but can send and receive messages to and
#! from other processes. It may also be linked to other processes so
#! that it receives a message if that process terminates.
localnode [ ] gensym unparse make-mailbox <process> ;
[ ] gensym unparse make-mailbox <process> ;
: make-linked-process ( process -- process )
#! Return a process set to run on the local node. That process is
#! linked to the process on the stack. It will receive a message if
#! that process terminates.
localnode swap unit gensym unparse make-mailbox <process> ;
unit gensym unparse make-mailbox <process> ;
: self ( -- process )
#! Returns the contents of the 'self-process' variables which
@ -184,10 +183,17 @@ init-main-process
] make-hash
swap bind ;
: spawn ( quot -- process )
DEFER: register-process
DEFER: unregister-process
: (spawn) ( quot -- process )
#! Start a process which runs the given quotation.
[ in-thread ] make-process [ with-process ] over slip ;
: spawn ( quot -- process )
#! Start a process which runs the given quotation.
[ self dup process-pid swap register-process call self process-pid unregister-process ] curry (spawn) ;
TUPLE: linked-exception error ;
: while-no-messages ( quot -- )
@ -196,14 +202,14 @@ TUPLE: linked-exception error ;
#! ( -- ).
>r self process-mailbox r> while-mailbox-empty ; inline
: remote-send ( message process -- )
M: remote-process send ( message process -- )
#! Send the message via the inter-node protocol
"remote-send not implemented" throw ;
: send ( message process -- )
M: process send ( message process -- )
#! Send the message to the process by placing it in the
#! processes mailbox.
dup local-process? [ process-mailbox mailbox-put ] [ remote-send ] if ;
process-mailbox mailbox-put ;
: receive ( -- message )
#! Return a message from the current processes mailbox.
@ -447,3 +453,51 @@ C: promise ( -- <promise> )
#! Given a process spawned using 'lazy', evaluate it and return the result.
f swap send-synchronous ;
! ******************************
! Standard Processes
! ******************************
TUPLE: register-msg name process ;
TUPLE: unregister-msg name ;
TUPLE: get-msg name ;
PREDICATE: tagged-message (get-msg) ( obj -- ? )
tagged-message-data get-msg? ;
: handle-register-process ( register-msg table -- )
>r [ register-msg-process ] keep register-msg-name r> set-hash ;
: handle-unregister-process ( unregister-msg table -- )
>r unregister-msg-name r> remove-hash ;
: handle-get-process ( get-msg table -- )
over tagged-message-data get-msg-name swap hash reply ;
: process-registry ( table -- )
receive {
{ [ dup register-msg? ] [ over handle-register-process ] }
{ [ dup unregister-msg? ] [ over handle-unregister-process ] }
{ [ dup (get-msg)? ] [ over handle-get-process ] }
} cond process-registry ;
[ H{ } clone process-registry ] (spawn) \ process-registry set-global
: register-process ( name process -- )
<register-msg> \ process-registry get send ;
: unregister-process ( name -- )
<unregister-msg> \ process-registry get send ;
: get-process ( name -- )
<get-msg> \ process-registry get send-synchronous ;
: handle-node-client ( stream -- )
[ [ deserialize ] with-serialized ] with-stream dup . first2 get-process send ;
: (node-server) ( server -- )
dup accept handle-node-client (node-server) ;
: node-server ( port -- )
<server> (node-server) ;
: send-to-node ( msg pid host port -- )
<client> [ 2array [ serialize ] with-serialized ] with-stream ;

View File

@ -1,4 +1,4 @@
REQUIRES: dlists ;
REQUIRES: dlists serialize ;
PROVIDE: concurrency
{ "concurrency.factor" }