From dc6bc9d32750ffd6edda3fcd40bf86a243748918 Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Thu, 29 Oct 2009 16:15:26 +1300 Subject: [PATCH 1/6] Add example of usage to concurrency.distributed help --- .../distributed/distributed-docs.factor | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/basis/concurrency/distributed/distributed-docs.factor b/basis/concurrency/distributed/distributed-docs.factor index 76c9918cca..4672043b36 100644 --- a/basis/concurrency/distributed/distributed-docs.factor +++ b/basis/concurrency/distributed/distributed-docs.factor @@ -8,11 +8,54 @@ HELP: start-node { $values { "port" "a port number between 0 and 65535" } } { $description "Starts a node server for receiving messages from remote Factor instances." } ; +ARTICLE: "concurrency.distributed.example" "Distributed Concurrency Example" +"For a Factor instance to be able to send and receive distributed " +"concurrency messages it must first have " { $link start-node } " called." +$nl +"In one factor instance call " { $link start-node } " with the port 9000, " +"and in another with the port 9001." +$nl +"In this example the Factor instance associated with port 9000 will run " +"a thread that sits receiving messages and printing the received message " +"in the listener. The code to start the thread is: " +{ $examples + { $unchecked-example + ": log-message ( -- ) receive . flush log-message ;" + "[ log-message ] \"logger\" spawn [ name>> ] keep register-process" + } +} +"This spawns a thread waits for the messages. It registers that thread as a " +"able to be accessed remotely using " { $link register-process } "." +$nl +"The second Factor instance, the one associated with port 9001, can send " +"messages to the 'logger' process by name:" +{ $examples + { $unchecked-example + "USING: io.sockets concurrency.messaging concurrency.distributed ;" + "\"hello\" \"logger\" \"127.0.0.1\" 9000 <inet4> <remote-process> send" + } +} +"The " { $link send } " word is used to send messages to other threads. If an " +"instance of " { $link remote-process } " is provided instead of a thread then " +"the message is marshalled to the named process on the given machine using the " +{ $vocab-link "serialize" } " vocabulary." +$nl +"Running this code should show the message \"hello\" in the first Factor " +"instance." +$nl +"It is also possible to use " { $link send-synchronous } " to receive a " +"response to a distributed message. When an instance of " { $link thread } " " +"is marshalled it is converted into an instance of " { $link remote-process } +". The receiver of this can use it as the target of a " { $link send } +" or " { $link reply } " call." ; + ARTICLE: "concurrency.distributed" "Distributed message passing" "The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite." { $subsections start-node } "Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:" { $subsections remote-process } -"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." ; +"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." +{ $subsections "concurrency.distributed.example" } ; + ABOUT: "concurrency.distributed" From 6b7d723982ed181cf98decabd290e6e95dd545d8 Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Thu, 29 Oct 2009 18:01:45 +1300 Subject: [PATCH 2/6] Fix channels.remote to/from words --- basis/channels/remote/remote.factor | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/basis/channels/remote/remote.factor b/basis/channels/remote/remote.factor index 6e10b23407..6f63b9bd4a 100644 --- a/basis/channels/remote/remote.factor +++ b/basis/channels/remote/remote.factor @@ -53,12 +53,12 @@ C: <remote-channel> remote-channel M: remote-channel to ( value remote-channel -- ) [ [ \ to , id>> , , ] { } make ] keep - node>> "remote-channels" <remote-process> + node>> "remote-channels" swap <remote-process> send-synchronous no-channel = [ no-channel throw ] when ; M: remote-channel from ( remote-channel -- value ) [ [ \ from , id>> , ] { } make ] keep - node>> "remote-channels" <remote-process> + node>> "remote-channels" swap <remote-process> send-synchronous dup no-channel = [ no-channel throw ] when* ; [ From 72ae46e72b7afaba7b9042b2f5cde7945f304136 Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Thu, 29 Oct 2009 18:02:07 +1300 Subject: [PATCH 3/6] Move distributed concurrency specific stuff from messaging to distributed --- .../distributed/distributed.factor | 24 ++++++++++++++++++- .../messaging/messaging-docs.factor | 2 +- basis/concurrency/messaging/messaging.factor | 18 -------------- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/basis/concurrency/distributed/distributed.factor b/basis/concurrency/distributed/distributed.factor index 52627f2ed9..325e8e3cc9 100644 --- a/basis/concurrency/distributed/distributed.factor +++ b/basis/concurrency/distributed/distributed.factor @@ -1,11 +1,27 @@ ! Copyright (C) 2005 Chris Double. All Rights Reserved. ! See http://factorcode.org/license.txt for BSD license. USING: serialize sequences concurrency.messaging threads io -io.servers.connection io.encodings.binary +io.servers.connection io.encodings.binary assocs init arrays namespaces kernel accessors ; FROM: io.sockets => host-name <inet> with-client ; IN: concurrency.distributed +<PRIVATE + +: registered-processes ( -- hash ) + \ registered-processes get-global ; + +PRIVATE> + +: register-process ( name process -- ) + swap registered-processes set-at ; + +: unregister-process ( name -- ) + registered-processes delete-at ; + +: get-process ( name -- process ) + dup registered-processes at [ ] [ thread ] ?if ; + SYMBOL: local-node : handle-node-client ( -- ) @@ -41,3 +57,9 @@ M: thread (serialize) ( obj -- ) : stop-node ( node -- ) f swap send-remote-message ; + +[ + H{ } clone \ registered-processes set-global +] "remote-thread-registry" add-init-hook + + diff --git a/basis/concurrency/messaging/messaging-docs.factor b/basis/concurrency/messaging/messaging-docs.factor index 17f05e20fb..85870db4df 100644 --- a/basis/concurrency/messaging/messaging-docs.factor +++ b/basis/concurrency/messaging/messaging-docs.factor @@ -1,6 +1,6 @@ ! Copyright (C) 2006 Chris Double. ! See http://factorcode.org/license.txt for BSD license. -USING: help.syntax help.markup concurrency.messaging.private +USING: help.syntax help.markup threads kernel arrays quotations strings ; IN: concurrency.messaging diff --git a/basis/concurrency/messaging/messaging.factor b/basis/concurrency/messaging/messaging.factor index ce7f7d6110..37965309e8 100644 --- a/basis/concurrency/messaging/messaging.factor +++ b/basis/concurrency/messaging/messaging.factor @@ -68,21 +68,3 @@ M: cannot-send-synchronous-to-self summary receive [ data>> swap call ] keep reply-synchronous ; inline - -<PRIVATE - -: registered-processes ( -- hash ) - \ registered-processes get-global ; - -PRIVATE> - -: register-process ( name process -- ) - swap registered-processes set-at ; - -: unregister-process ( name -- ) - registered-processes delete-at ; - -: get-process ( name -- process ) - dup registered-processes at [ ] [ thread ] ?if ; - -\ registered-processes [ H{ } clone ] initialize From 536a4a39325386c5ccd79f932140882d8f46e3cb Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Thu, 29 Oct 2009 18:39:25 +1300 Subject: [PATCH 4/6] Rename distributed process registry stuff to remote-thread --- basis/channels/remote/remote.factor | 13 ++++----- .../distributed/distributed-docs.factor | 18 ++++++------ .../distributed/distributed-tests.factor | 2 +- .../distributed/distributed.factor | 28 +++++++++---------- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/basis/channels/remote/remote.factor b/basis/channels/remote/remote.factor index 6f63b9bd4a..59dec91859 100644 --- a/basis/channels/remote/remote.factor +++ b/basis/channels/remote/remote.factor @@ -28,7 +28,7 @@ MATCH-VARS: ?from ?tag ?id ?value ; SYMBOL: no-channel -: channel-process ( -- ) +: channel-thread ( -- ) [ { { { to ?id ?value } @@ -41,10 +41,9 @@ SYMBOL: no-channel PRIVATE> : start-channel-node ( -- ) - "remote-channels" get-process [ - "remote-channels" - [ channel-process t ] "Remote channels" spawn-server - register-process + "remote-channels" get-remote-thread [ + [ channel-thread t ] "Remote channels" spawn-server + "remote-channels" register-remote-thread ] unless ; TUPLE: remote-channel node id ; @@ -53,12 +52,12 @@ C: <remote-channel> remote-channel M: remote-channel to ( value remote-channel -- ) [ [ \ to , id>> , , ] { } make ] keep - node>> "remote-channels" swap <remote-process> + node>> "remote-channels" <remote-thread> send-synchronous no-channel = [ no-channel throw ] when ; M: remote-channel from ( remote-channel -- value ) [ [ \ from , id>> , ] { } make ] keep - node>> "remote-channels" swap <remote-process> + node>> "remote-channels" <remote-thread> send-synchronous dup no-channel = [ no-channel throw ] when* ; [ diff --git a/basis/concurrency/distributed/distributed-docs.factor b/basis/concurrency/distributed/distributed-docs.factor index 4672043b36..8ea7153b0b 100644 --- a/basis/concurrency/distributed/distributed-docs.factor +++ b/basis/concurrency/distributed/distributed-docs.factor @@ -21,23 +21,23 @@ $nl { $examples { $unchecked-example ": log-message ( -- ) receive . flush log-message ;" - "[ log-message ] \"logger\" spawn [ name>> ] keep register-process" + "[ log-message ] \"logger\" spawn dup name>> register-remote-thread" } } "This spawns a thread waits for the messages. It registers that thread as a " -"able to be accessed remotely using " { $link register-process } "." +"able to be accessed remotely using " { $link register-remote-thread } "." $nl "The second Factor instance, the one associated with port 9001, can send " -"messages to the 'logger' process by name:" +"messages to the 'logger' thread by name:" { $examples { $unchecked-example "USING: io.sockets concurrency.messaging concurrency.distributed ;" - "\"hello\" \"logger\" \"127.0.0.1\" 9000 <inet4> <remote-process> send" + "\"hello\" \"127.0.0.1\" 9000 <inet4> \"logger\" <remote-thread> send" } } "The " { $link send } " word is used to send messages to other threads. If an " -"instance of " { $link remote-process } " is provided instead of a thread then " -"the message is marshalled to the named process on the given machine using the " +"instance of " { $link remote-thread } " is provided instead of a thread then " +"the message is marshalled to the named thread on the given machine using the " { $vocab-link "serialize" } " vocabulary." $nl "Running this code should show the message \"hello\" in the first Factor " @@ -45,15 +45,15 @@ $nl $nl "It is also possible to use " { $link send-synchronous } " to receive a " "response to a distributed message. When an instance of " { $link thread } " " -"is marshalled it is converted into an instance of " { $link remote-process } +"is marshalled it is converted into an instance of " { $link remote-thread } ". The receiver of this can use it as the target of a " { $link send } " or " { $link reply } " call." ; ARTICLE: "concurrency.distributed" "Distributed message passing" "The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite." { $subsections start-node } -"Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:" -{ $subsections remote-process } +"Instances of " { $link thread } " can be sent to remote threads, at which point they are converted to objects holding the thread ID and the current node's host name:" +{ $subsections remote-thread } "The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." { $subsections "concurrency.distributed.example" } ; diff --git a/basis/concurrency/distributed/distributed-tests.factor b/basis/concurrency/distributed/distributed-tests.factor index b2a2851926..96955ac94b 100644 --- a/basis/concurrency/distributed/distributed-tests.factor +++ b/basis/concurrency/distributed/distributed-tests.factor @@ -25,7 +25,7 @@ IN: concurrency.distributed.tests [ 8 ] [ 5 self 2array - "thread-a" test-node <remote-process> send + test-node "thread-a" <remote-thread> send receive ] unit-test diff --git a/basis/concurrency/distributed/distributed.factor b/basis/concurrency/distributed/distributed.factor index 325e8e3cc9..244f1d95a3 100644 --- a/basis/concurrency/distributed/distributed.factor +++ b/basis/concurrency/distributed/distributed.factor @@ -8,25 +8,25 @@ IN: concurrency.distributed <PRIVATE -: registered-processes ( -- hash ) - \ registered-processes get-global ; +: registered-remote-threads ( -- hash ) + \ registered-remote-threads get-global ; PRIVATE> -: register-process ( name process -- ) - swap registered-processes set-at ; +: register-remote-thread ( thread name -- ) + registered-remote-threads set-at ; -: unregister-process ( name -- ) - registered-processes delete-at ; +: unregister-remote-thread ( name -- ) + registered-remote-threads delete-at ; -: get-process ( name -- process ) - dup registered-processes at [ ] [ thread ] ?if ; +: get-remote-thread ( name -- thread ) + dup registered-remote-threads at [ ] [ thread ] ?if ; SYMBOL: local-node : handle-node-client ( -- ) deserialize - [ first2 get-process send ] [ stop-this-server ] if* ; + [ first2 get-remote-thread send ] [ stop-this-server ] if* ; : <node-server> ( addrspec -- threaded-server ) binary <threaded-server> @@ -40,26 +40,26 @@ SYMBOL: local-node : start-node ( port -- ) host-name over <inet> (start-node) ; -TUPLE: remote-process id node ; +TUPLE: remote-thread node id ; -C: <remote-process> remote-process +C: <remote-thread> remote-thread : send-remote-message ( message node -- ) binary [ serialize ] with-client ; -M: remote-process send ( message thread -- ) +M: remote-thread send ( message thread -- ) [ id>> 2array ] [ node>> ] bi send-remote-message ; M: thread (serialize) ( obj -- ) - id>> local-node get-global <remote-process> + id>> [ local-node get-global ] dip <remote-thread> (serialize) ; : stop-node ( node -- ) f swap send-remote-message ; [ - H{ } clone \ registered-processes set-global + H{ } clone \ registered-remote-threads set-global ] "remote-thread-registry" add-init-hook From 628a0ba5307639ddf7dbd2fb0ec805996352c293 Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Fri, 30 Oct 2009 14:19:34 +1300 Subject: [PATCH 5/6] Refactor some remote channels code --- basis/channels/remote/remote.factor | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/basis/channels/remote/remote.factor b/basis/channels/remote/remote.factor index 59dec91859..0a88875544 100644 --- a/basis/channels/remote/remote.factor +++ b/basis/channels/remote/remote.factor @@ -2,7 +2,7 @@ ! See http://factorcode.org/license.txt for BSD license. ! ! Remote Channels -USING: kernel init namespaces make assocs arrays random +USING: kernel init namespaces assocs arrays random sequences channels match concurrency.messaging concurrency.distributed threads accessors ; IN: channels.remote @@ -27,38 +27,44 @@ PRIVATE> MATCH-VARS: ?from ?tag ?id ?value ; SYMBOL: no-channel +TUPLE: to-message id value ; +TUPLE: from-message id ; : channel-thread ( -- ) [ { - { { to ?id ?value } + { T{ to-message f ?id ?value } [ ?value ?id get-channel dup [ to f ] [ 2drop no-channel ] if ] } - { { from ?id } + { T{ from-message f ?id } [ ?id get-channel [ from ] [ no-channel ] if* ] } } match-cond ] handle-synchronous ; -PRIVATE> - : start-channel-node ( -- ) "remote-channels" get-remote-thread [ [ channel-thread t ] "Remote channels" spawn-server "remote-channels" register-remote-thread ] unless ; +PRIVATE> + TUPLE: remote-channel node id ; C: <remote-channel> remote-channel -M: remote-channel to ( value remote-channel -- ) - [ [ \ to , id>> , , ] { } make ] keep - node>> "remote-channels" <remote-thread> - send-synchronous no-channel = [ no-channel throw ] when ; +<PRIVATE -M: remote-channel from ( remote-channel -- value ) - [ [ \ from , id>> , ] { } make ] keep +: send-message ( message remote-channel -- value ) node>> "remote-channels" <remote-thread> send-synchronous dup no-channel = [ no-channel throw ] when* ; + +PRIVATE> + +M: remote-channel to ( value remote-channel -- ) + [ id>> swap to-message boa ] keep send-message drop ; + +M: remote-channel from ( remote-channel -- value ) + [ id>> from-message boa ] keep send-message ; [ H{ } clone \ remote-channels set-global From afec4842401b0f1e101c0b98aac0cf6da46c88a0 Mon Sep 17 00:00:00 2001 From: Chris Double <chris.double@double.co.nz> Date: Fri, 30 Oct 2009 14:25:10 +1300 Subject: [PATCH 6/6] Update remote channels help --- basis/channels/remote/remote-docs.factor | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/basis/channels/remote/remote-docs.factor b/basis/channels/remote/remote-docs.factor index 309f764d2d..c612b4256a 100644 --- a/basis/channels/remote/remote-docs.factor +++ b/basis/channels/remote/remote-docs.factor @@ -53,11 +53,11 @@ $nl " to be accessed remotely. " { $link publish } " returns an id which a remote node " "needs to know to access the channel." $nl -{ $snippet "channel [ from . ] spawn drop dup publish" } +{ $snippet "<channel> dup [ from . flush ] curry \"test\" spawn drop publish" } $nl -"Given the id from the snippet above, a remote node can put items in the channel." +"Given the id from the snippet above, a remote node can put items in the channel (where 123456 is the id):" $nl -{ $snippet "\"myhost.com\" 9001 <node> \"ID123456\" <remote-channel>\n\"hello\" over to" } +{ $snippet "\"myhost.com\" 9001 <node> 123456 <remote-channel>\n\"hello\" over to" } ; ABOUT: { "remote-channels" "remote-channels" }