Use new threaded-server features in concurrency.distributed

db4
Doug Coleman 2010-09-19 21:39:14 -05:00
parent 8aa22487da
commit f20ee7a53b
3 changed files with 29 additions and 47 deletions

View File

@ -1,22 +1,9 @@
USING: help.markup help.syntax concurrency.messaging threads ;
IN: concurrency.distributed
HELP: local-node
{ $var-description "A variable containing the node the current thread is running on." } ;
HELP: start-node
{ $values { "port" "a port number between 0 and 65535" } { "threaded-server" "a threaded-server tuple" } }
{ $description "Starts a node server for receiving messages from remote Factor instances." } ;
ARTICLE: "concurrency.distributed.example" "Distributed Concurrency Example"
"For a Factor instance to be able to send and receive distributed "
"concurrency messages it must first have " { $link start-node } " called."
$nl
"In one factor instance call " { $link start-node } " with the port 9000, "
"and in another with the port 9001."
$nl
"In this example the Factor instance associated with port 9000 will run "
"a thread that sits receiving messages and printing the received message "
"a thread that receives and prints messages "
"in the listener. The code to start the thread is: "
{ $examples
{ $unchecked-example
@ -51,11 +38,9 @@ $nl
ARTICLE: "concurrency.distributed" "Distributed message passing"
"The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite."
{ $subsections start-node }
"Instances of " { $link thread } " can be sent to remote threads, at which point they are converted to objects holding the thread ID and the current node's host name:"
{ $subsections remote-thread }
"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket."
{ $subsections "concurrency.distributed.example" } ;
ABOUT: "concurrency.distributed"

View File

@ -1,33 +1,39 @@
USING: tools.test concurrency.distributed kernel io.files
io.files.temp io.directories arrays io.sockets system
io.files.temp io.directories arrays io.sockets system calendar
combinators threads math sequences concurrency.messaging
continuations accessors prettyprint ;
continuations accessors prettyprint io.servers.connection ;
FROM: concurrency.messaging => receive send ;
IN: concurrency.distributed.tests
: test-node ( -- addrspec )
CONSTANT: test-ip "127.0.0.1"
: test-node-server ( -- threaded-server )
{
{ [ os unix? ] [ "distributed-concurrency-test" temp-file <local> ] }
{ [ os windows? ] [ "127.0.0.1" 1238 <inet4> ] }
{ [ os windows? ] [ test-ip 0 <inet4> ] }
} cond <node-server> ;
: test-node-client ( -- addrspec )
{
{ [ os unix? ] [ "distributed-concurrency-test" temp-file <local> ] }
{ [ os windows? ] [ test-ip insecure-port <inet4> ] }
} cond ;
[ ] [ [ "distributed-concurrency-test" temp-file delete-file ] ignore-errors ] unit-test
[ ] [ test-node dup (start-node) drop ] unit-test
test-node-server [
[ ] [
[
receive first2 [ 3 + ] dip send
"thread-a" unregister-remote-thread
] "Thread A" spawn
"thread-a" register-remote-thread
] unit-test
[ ] [
[
receive first2 [ 3 + ] dip send
"thread-a" unregister-remote-thread
] "Thread A" spawn
"thread-a" register-remote-thread
] unit-test
[ 8 ] [
5 self 2array
test-node "thread-a" <remote-thread> send
receive
] unit-test
[ ] [ test-node stop-node ] unit-test
[ 8 ] [
5 self 2array
test-node-client "thread-a" <remote-thread> send
100 seconds receive-timeout
] unit-test
] with-threaded-server

View File

@ -22,8 +22,6 @@ PRIVATE>
: get-remote-thread ( name -- thread )
dup registered-remote-threads at [ ] [ threads at ] ?if ;
SYMBOL: local-node
: handle-node-client ( -- )
deserialize
[ first2 get-remote-thread send ] [ stop-this-server ] if* ;
@ -34,12 +32,6 @@ SYMBOL: local-node
"concurrency.distributed" >>name
[ handle-node-client ] >>handler ;
: (start-node) ( addrspec addrspec -- threaded-server )
local-node set-global <node-server> start-server ;
: start-node ( port -- threaded-server )
host-name over <inet> (start-node) ;
TUPLE: remote-thread node id ;
C: <remote-thread> remote-thread
@ -52,8 +44,7 @@ M: remote-thread send ( message thread -- )
send-remote-message ;
M: thread (serialize) ( obj -- )
id>> [ local-node get-global ] dip <remote-thread>
(serialize) ;
id>> [ insecure-addr ] dip <remote-thread> (serialize) ;
: stop-node ( node -- )
f swap send-remote-message ;