Merge branch 'master' of git://factorcode.org/git/factor
						commit
						b318741572
					
				| 
						 | 
				
			
			@ -53,11 +53,11 @@ $nl
 | 
			
		|||
" to be accessed remotely. " { $link publish } " returns an id which a remote node "
 | 
			
		||||
"needs to know to access the channel."
 | 
			
		||||
$nl
 | 
			
		||||
{ $snippet "channel [ from . ] spawn drop dup publish" }
 | 
			
		||||
{ $snippet "<channel> dup [ from . flush ] curry \"test\" spawn drop publish" }
 | 
			
		||||
$nl
 | 
			
		||||
"Given the id from the snippet above, a remote node can put items in the channel."
 | 
			
		||||
"Given the id from the snippet above, a remote node can put items in the channel (where 123456 is the id):"
 | 
			
		||||
$nl
 | 
			
		||||
{ $snippet "\"myhost.com\" 9001 <node> \"ID123456\" <remote-channel>\n\"hello\" over to" } 
 | 
			
		||||
{ $snippet "\"myhost.com\" 9001 <node> 123456 <remote-channel>\n\"hello\" over to" } 
 | 
			
		||||
;
 | 
			
		||||
 | 
			
		||||
ABOUT: { "remote-channels" "remote-channels" }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,7 @@
 | 
			
		|||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
!
 | 
			
		||||
! Remote Channels
 | 
			
		||||
USING: kernel init namespaces make assocs arrays random
 | 
			
		||||
USING: kernel init namespaces assocs arrays random
 | 
			
		||||
sequences channels match concurrency.messaging
 | 
			
		||||
concurrency.distributed threads accessors ;
 | 
			
		||||
IN: channels.remote
 | 
			
		||||
| 
						 | 
				
			
			@ -27,39 +27,44 @@ PRIVATE>
 | 
			
		|||
MATCH-VARS: ?from ?tag ?id ?value ;
 | 
			
		||||
 | 
			
		||||
SYMBOL: no-channel
 | 
			
		||||
TUPLE: to-message id value ;
 | 
			
		||||
TUPLE: from-message id ;
 | 
			
		||||
 | 
			
		||||
: channel-process ( -- )
 | 
			
		||||
: channel-thread ( -- )
 | 
			
		||||
    [
 | 
			
		||||
        {
 | 
			
		||||
            { { to ?id ?value  }
 | 
			
		||||
            { T{ to-message f ?id ?value  }
 | 
			
		||||
            [ ?value ?id get-channel dup [ to f ] [ 2drop no-channel ] if ] }
 | 
			
		||||
            { { from ?id }
 | 
			
		||||
            { T{ from-message f ?id }
 | 
			
		||||
            [ ?id get-channel [ from ] [ no-channel ] if* ] }
 | 
			
		||||
        } match-cond
 | 
			
		||||
    ] handle-synchronous ;
 | 
			
		||||
 | 
			
		||||
PRIVATE>
 | 
			
		||||
 | 
			
		||||
: start-channel-node ( -- )
 | 
			
		||||
    "remote-channels" get-process [
 | 
			
		||||
        "remote-channels" 
 | 
			
		||||
        [ channel-process t ] "Remote channels" spawn-server
 | 
			
		||||
        register-process 
 | 
			
		||||
    "remote-channels" get-remote-thread [
 | 
			
		||||
        [ channel-thread t ] "Remote channels" spawn-server
 | 
			
		||||
        "remote-channels" register-remote-thread 
 | 
			
		||||
    ] unless ;
 | 
			
		||||
 | 
			
		||||
PRIVATE>
 | 
			
		||||
 | 
			
		||||
TUPLE: remote-channel node id ;
 | 
			
		||||
 | 
			
		||||
C: <remote-channel> remote-channel 
 | 
			
		||||
 | 
			
		||||
<PRIVATE
 | 
			
		||||
 | 
			
		||||
: send-message ( message remote-channel -- value )
 | 
			
		||||
    node>> "remote-channels" <remote-thread> 
 | 
			
		||||
    send-synchronous dup no-channel = [ no-channel throw ] when* ;
 | 
			
		||||
    
 | 
			
		||||
PRIVATE>
 | 
			
		||||
 | 
			
		||||
M: remote-channel to ( value remote-channel -- )
 | 
			
		||||
    [ [ \ to , id>> , , ] { } make ] keep
 | 
			
		||||
    node>> "remote-channels" <remote-process> 
 | 
			
		||||
    send-synchronous no-channel = [ no-channel throw ] when ;
 | 
			
		||||
    [ id>> swap to-message boa ] keep send-message drop ;
 | 
			
		||||
 | 
			
		||||
M: remote-channel from ( remote-channel -- value )
 | 
			
		||||
    [ [ \ from , id>> , ] { } make ] keep
 | 
			
		||||
    node>> "remote-channels" <remote-process> 
 | 
			
		||||
    send-synchronous dup no-channel = [ no-channel throw ] when* ;
 | 
			
		||||
    [ id>> from-message boa ] keep send-message ;
 | 
			
		||||
 | 
			
		||||
[
 | 
			
		||||
    H{ } clone \ remote-channels set-global
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,11 +8,54 @@ HELP: start-node
 | 
			
		|||
{ $values { "port" "a port number between 0 and 65535" } }
 | 
			
		||||
{ $description "Starts a node server for receiving messages from remote Factor instances." } ;
 | 
			
		||||
 | 
			
		||||
ARTICLE: "concurrency.distributed.example" "Distributed Concurrency Example"
 | 
			
		||||
"For a Factor instance to be able to send and receive distributed "
 | 
			
		||||
"concurrency messages it must first have " { $link start-node } " called."
 | 
			
		||||
$nl
 | 
			
		||||
"In one factor instance call " { $link start-node } " with the port 9000, "
 | 
			
		||||
"and in another with the port 9001."
 | 
			
		||||
$nl
 | 
			
		||||
"In this example the Factor instance associated with port 9000 will run "
 | 
			
		||||
"a thread that sits receiving messages and printing the received message "
 | 
			
		||||
"in the listener. The code to start the thread is: "
 | 
			
		||||
{ $examples
 | 
			
		||||
    { $unchecked-example
 | 
			
		||||
        ": log-message ( -- ) receive . flush log-message ;"
 | 
			
		||||
        "[ log-message ] \"logger\" spawn dup name>> register-remote-thread"
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
"This spawns a thread waits for the messages. It registers that thread as a "
 | 
			
		||||
"able to be accessed remotely using " { $link register-remote-thread } "."
 | 
			
		||||
$nl
 | 
			
		||||
"The second Factor instance, the one associated with port 9001, can send "
 | 
			
		||||
"messages to the 'logger' thread by name:"
 | 
			
		||||
{ $examples
 | 
			
		||||
    { $unchecked-example
 | 
			
		||||
        "USING: io.sockets concurrency.messaging concurrency.distributed ;"
 | 
			
		||||
        "\"hello\" \"127.0.0.1\" 9000 <inet4> \"logger\" <remote-thread> send"
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
"The " { $link send } " word is used to send messages to other threads. If an "
 | 
			
		||||
"instance of " { $link remote-thread } " is provided instead of a thread then "
 | 
			
		||||
"the message is marshalled to the named thread on the given machine using the "
 | 
			
		||||
{ $vocab-link "serialize" } " vocabulary."
 | 
			
		||||
$nl
 | 
			
		||||
"Running this code should show the message \"hello\" in the first Factor "
 | 
			
		||||
"instance."
 | 
			
		||||
$nl
 | 
			
		||||
"It is also possible to use " { $link send-synchronous } " to receive a "
 | 
			
		||||
"response to a distributed message. When an instance of " { $link thread } " "
 | 
			
		||||
"is marshalled it is converted into an instance of " { $link remote-thread }
 | 
			
		||||
". The receiver of this can use it as the target of a " { $link send }
 | 
			
		||||
" or " { $link reply } " call." ;
 | 
			
		||||
 | 
			
		||||
ARTICLE: "concurrency.distributed" "Distributed message passing"
 | 
			
		||||
"The " { $vocab-link "concurrency.distributed" } " implements transparent distributed message passing, inspired by Erlang and Termite."
 | 
			
		||||
{ $subsections start-node }
 | 
			
		||||
"Instances of " { $link thread } " can be sent to remote processes, at which point they are converted to objects holding the thread ID and the current node's host name:"
 | 
			
		||||
{ $subsections remote-process }
 | 
			
		||||
"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." ;
 | 
			
		||||
"Instances of " { $link thread } " can be sent to remote threads, at which point they are converted to objects holding the thread ID and the current node's host name:"
 | 
			
		||||
{ $subsections remote-thread }
 | 
			
		||||
"The " { $vocab-link "serialize" } " vocabulary is used to convert Factor objects to byte arrays for transfer over a socket." 
 | 
			
		||||
{ $subsections "concurrency.distributed.example" } ;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ABOUT: "concurrency.distributed"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,14 +18,14 @@ IN: concurrency.distributed.tests
 | 
			
		|||
[ ] [
 | 
			
		||||
    [
 | 
			
		||||
        receive first2 [ 3 + ] dip send
 | 
			
		||||
        "thread-a" unregister-process
 | 
			
		||||
        "thread-a" unregister-remote-thread
 | 
			
		||||
    ] "Thread A" spawn
 | 
			
		||||
    "thread-a" swap register-process
 | 
			
		||||
    "thread-a" register-remote-thread
 | 
			
		||||
] unit-test
 | 
			
		||||
 | 
			
		||||
[ 8 ] [
 | 
			
		||||
    5 self 2array
 | 
			
		||||
    "thread-a" test-node <remote-process> send
 | 
			
		||||
    test-node "thread-a" <remote-thread> send
 | 
			
		||||
 | 
			
		||||
    receive
 | 
			
		||||
] unit-test
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,16 +1,32 @@
 | 
			
		|||
! Copyright (C) 2005 Chris Double. All Rights Reserved.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: serialize sequences concurrency.messaging threads io
 | 
			
		||||
io.servers.connection io.encodings.binary
 | 
			
		||||
io.servers.connection io.encodings.binary assocs init
 | 
			
		||||
arrays namespaces kernel accessors ;
 | 
			
		||||
FROM: io.sockets => host-name <inet> with-client ;
 | 
			
		||||
IN: concurrency.distributed
 | 
			
		||||
 | 
			
		||||
<PRIVATE
 | 
			
		||||
 | 
			
		||||
: registered-remote-threads ( -- hash )
 | 
			
		||||
   \ registered-remote-threads get-global ;
 | 
			
		||||
 | 
			
		||||
PRIVATE>
 | 
			
		||||
 | 
			
		||||
: register-remote-thread ( thread name -- )
 | 
			
		||||
    registered-remote-threads set-at ;
 | 
			
		||||
 | 
			
		||||
: unregister-remote-thread ( name -- )
 | 
			
		||||
    registered-remote-threads delete-at ;
 | 
			
		||||
 | 
			
		||||
: get-remote-thread ( name -- thread )
 | 
			
		||||
    dup registered-remote-threads at [ ] [ thread ] ?if ;
 | 
			
		||||
 | 
			
		||||
SYMBOL: local-node
 | 
			
		||||
 | 
			
		||||
: handle-node-client ( -- )
 | 
			
		||||
    deserialize
 | 
			
		||||
    [ first2 get-process send ] [ stop-this-server ] if* ;
 | 
			
		||||
    [ first2 get-remote-thread send ] [ stop-this-server ] if* ;
 | 
			
		||||
 | 
			
		||||
: <node-server> ( addrspec -- threaded-server )
 | 
			
		||||
    binary <threaded-server>
 | 
			
		||||
| 
						 | 
				
			
			@ -24,20 +40,26 @@ SYMBOL: local-node
 | 
			
		|||
: start-node ( port -- )
 | 
			
		||||
    host-name over <inet> (start-node) ;
 | 
			
		||||
 | 
			
		||||
TUPLE: remote-process id node ;
 | 
			
		||||
TUPLE: remote-thread node id ;
 | 
			
		||||
 | 
			
		||||
C: <remote-process> remote-process
 | 
			
		||||
C: <remote-thread> remote-thread
 | 
			
		||||
 | 
			
		||||
: send-remote-message ( message node -- )
 | 
			
		||||
    binary [ serialize ] with-client ;
 | 
			
		||||
 | 
			
		||||
M: remote-process send ( message thread -- )
 | 
			
		||||
M: remote-thread send ( message thread -- )
 | 
			
		||||
    [ id>> 2array ] [ node>> ] bi
 | 
			
		||||
    send-remote-message ;
 | 
			
		||||
 | 
			
		||||
M: thread (serialize) ( obj -- )
 | 
			
		||||
    id>> local-node get-global <remote-process>
 | 
			
		||||
    id>> [ local-node get-global ] dip <remote-thread>
 | 
			
		||||
    (serialize) ;
 | 
			
		||||
 | 
			
		||||
: stop-node ( node -- )
 | 
			
		||||
    f swap send-remote-message ;
 | 
			
		||||
 | 
			
		||||
[
 | 
			
		||||
    H{ } clone \ registered-remote-threads set-global
 | 
			
		||||
] "remote-thread-registry" add-init-hook
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
! Copyright (C) 2006 Chris Double.
 | 
			
		||||
! See http://factorcode.org/license.txt for BSD license.
 | 
			
		||||
USING: help.syntax help.markup concurrency.messaging.private
 | 
			
		||||
USING: help.syntax help.markup 
 | 
			
		||||
threads kernel arrays quotations strings ;
 | 
			
		||||
IN: concurrency.messaging
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -68,21 +68,3 @@ M: cannot-send-synchronous-to-self summary
 | 
			
		|||
    receive [
 | 
			
		||||
        data>> swap call
 | 
			
		||||
    ] keep reply-synchronous ; inline
 | 
			
		||||
 | 
			
		||||
<PRIVATE
 | 
			
		||||
 | 
			
		||||
: registered-processes ( -- hash )
 | 
			
		||||
   \ registered-processes get-global ;
 | 
			
		||||
 | 
			
		||||
PRIVATE>
 | 
			
		||||
 | 
			
		||||
: register-process ( name process -- )
 | 
			
		||||
    swap registered-processes set-at ;
 | 
			
		||||
 | 
			
		||||
: unregister-process ( name -- )
 | 
			
		||||
    registered-processes delete-at ;
 | 
			
		||||
 | 
			
		||||
: get-process ( name -- process )
 | 
			
		||||
    dup registered-processes at [ ] [ thread ] ?if ;
 | 
			
		||||
 | 
			
		||||
\ registered-processes [ H{ } clone ] initialize
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue