Rename distributed process registry stuff to remote-thread
parent
72ae46e72b
commit
536a4a3932
|
@ -28,7 +28,7 @@ MATCH-VARS: ?from ?tag ?id ?value ;
|
|||
|
||||
SYMBOL: no-channel
|
||||
|
||||
: channel-process ( -- )
|
||||
: channel-thread ( -- )
|
||||
[
|
||||
{
|
||||
{ { to ?id ?value }
|
||||
|
@ -41,10 +41,9 @@ SYMBOL: no-channel
|
|||
PRIVATE>
|
||||
|
||||
: start-channel-node ( -- )
|
||||
"remote-channels" get-process [
|
||||
"remote-channels"
|
||||
[ channel-process t ] "Remote channels" spawn-server
|
||||
register-process
|
||||
"remote-channels" get-remote-thread [
|
||||
[ channel-thread t ] "Remote channels" spawn-server
|
||||
"remote-channels" register-remote-thread
|
||||
] unless ;
|
||||
|
||||
TUPLE: remote-channel node id ;
|
||||
|
@ -53,12 +52,12 @@ C: <remote-channel> remote-channel
|
|||
|
||||
M: remote-channel to ( value remote-channel -- )
|
||||
[ [ \ to , id>> , , ] { } make ] keep
|
||||
node>> "remote-channels" swap <remote-process>
|
||||
node>> "remote-channels" <remote-thread>
|
||||
send-synchronous no-channel = [ no-channel throw ] when ;
|
||||
|
||||
M: remote-channel from ( remote-channel -- value )
|
||||
[ [ \ from , id>> , ] { } make ] keep
|
||||
node>> "remote-channels" swap <remote-process>
|
||||
node>> "remote-channels" <remote-thread>
|
||||
send-synchronous dup no-channel = [ no-channel throw ] when* ;
|
||||
|
||||
[
|
||||
|
|
|
@ -21,23 +21,23 @@ $nl
|
|||
{ $examples
|
||||
{ $unchecked-example
|
||||
": log-message ( -- ) receive . flush log-message ;"
|
||||
"[ log-message ] \"logger\" spawn [ name>> ] keep register-process"
|
||||
"[ log-message ] \"logger\" spawn dup name>> register-remote-thread"
|
||||
}
|
||||
}
|
||||
"This spawns a thread waits for the messages. It registers that thread as a "
|
||||
"able to be accessed remotely using " { $link register-process } "."
|
||||
"able to be accessed remotely using " { $link register-remote-thread } "."
|
||||
$nl
|
||||
"The second Factor instance, the one associated with port 9001, can send "
|
||||
"messages to the 'logger' process by name:"
|
||||
"messages to the 'logger' thread by name:"
|
||||
{ $examples
|
||||
{ $unchecked-example
|
||||
"USING: io.sockets concurrency.messaging concurrency.distributed ;"
|
||||
"\"hello\" \"logger\" \"127.0.0.1\" 9000 <inet4> <remote-process> send"
|
||||
"\"hello\" \"127.0.0.1\" 9000 <inet4> \"logger\" <remote-thread> send"
|
||||
}
|
||||
}
|
||||
"The " { $link send } " word is used to send messages to other threads. If an "
|
||||
"instance of " { $link remote-process } " is provided instead of a thread then "
|
||||
"the message is marshalled to the named process on the given machine using the "
|
||||
"instance of " { $link remote-thread } " is provided instead of a thread then "
|
||||
"the message is marshalled to the named thread on the given machine using the "
|
||||
{ $vocab-link "serialize" } " vocabulary."
|
||||
$nl
|
||||
"Running this code should show the message \"hello\" in the first Factor "
|
||||
|
@ -45,15 +45,15 @@ $nl
|
|||
$nl
|
||||
"It is also possible to use " { $link send-synchronous } " to receive a "
|
||||
"response to a distributed message. When an instance of " { $link thread } " "
|
||||
"is marshalled it is converted into an instance of " { $link remote-process }
|
||||
"is marshalled it is converted into an instance of " { $link remote-thread }
|
||||
". The receiver of this can use it as the target of a " { $link send }
|
||||
" or " { $link reply } " call." ;
|
||||
|
||||
ARTICLE: "concurrency.distributed" "Distributed message passing"
|
||||
"The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite."
|
||||
{ $subsections start-node }
|
||||
"Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:"
|
||||
{ $subsections remote-process }
|
||||
"Instances of " { $link thread } " can be sent to remote threads, at which point they are converted to objects holding the thread ID and the current node's host name:"
|
||||
{ $subsections remote-thread }
|
||||
"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket."
|
||||
{ $subsections "concurrency.distributed.example" } ;
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ IN: concurrency.distributed.tests
|
|||
|
||||
[ 8 ] [
|
||||
5 self 2array
|
||||
"thread-a" test-node <remote-process> send
|
||||
test-node "thread-a" <remote-thread> send
|
||||
|
||||
receive
|
||||
] unit-test
|
||||
|
|
|
@ -8,25 +8,25 @@ IN: concurrency.distributed
|
|||
|
||||
<PRIVATE
|
||||
|
||||
: registered-processes ( -- hash )
|
||||
\ registered-processes get-global ;
|
||||
: registered-remote-threads ( -- hash )
|
||||
\ registered-remote-threads get-global ;
|
||||
|
||||
PRIVATE>
|
||||
|
||||
: register-process ( name process -- )
|
||||
swap registered-processes set-at ;
|
||||
: register-remote-thread ( thread name -- )
|
||||
registered-remote-threads set-at ;
|
||||
|
||||
: unregister-process ( name -- )
|
||||
registered-processes delete-at ;
|
||||
: unregister-remote-thread ( name -- )
|
||||
registered-remote-threads delete-at ;
|
||||
|
||||
: get-process ( name -- process )
|
||||
dup registered-processes at [ ] [ thread ] ?if ;
|
||||
: get-remote-thread ( name -- thread )
|
||||
dup registered-remote-threads at [ ] [ thread ] ?if ;
|
||||
|
||||
SYMBOL: local-node
|
||||
|
||||
: handle-node-client ( -- )
|
||||
deserialize
|
||||
[ first2 get-process send ] [ stop-this-server ] if* ;
|
||||
[ first2 get-remote-thread send ] [ stop-this-server ] if* ;
|
||||
|
||||
: <node-server> ( addrspec -- threaded-server )
|
||||
binary <threaded-server>
|
||||
|
@ -40,26 +40,26 @@ SYMBOL: local-node
|
|||
: start-node ( port -- )
|
||||
host-name over <inet> (start-node) ;
|
||||
|
||||
TUPLE: remote-process id node ;
|
||||
TUPLE: remote-thread node id ;
|
||||
|
||||
C: <remote-process> remote-process
|
||||
C: <remote-thread> remote-thread
|
||||
|
||||
: send-remote-message ( message node -- )
|
||||
binary [ serialize ] with-client ;
|
||||
|
||||
M: remote-process send ( message thread -- )
|
||||
M: remote-thread send ( message thread -- )
|
||||
[ id>> 2array ] [ node>> ] bi
|
||||
send-remote-message ;
|
||||
|
||||
M: thread (serialize) ( obj -- )
|
||||
id>> local-node get-global <remote-process>
|
||||
id>> [ local-node get-global ] dip <remote-thread>
|
||||
(serialize) ;
|
||||
|
||||
: stop-node ( node -- )
|
||||
f swap send-remote-message ;
|
||||
|
||||
[
|
||||
H{ } clone \ registered-processes set-global
|
||||
H{ } clone \ registered-remote-threads set-global
|
||||
] "remote-thread-registry" add-init-hook
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue