From 6e599f01915000229890201e9df8b70c31ba4308 Mon Sep 17 00:00:00 2001 From: Slava Pestov Date: Tue, 11 Mar 2008 22:42:32 -0500 Subject: [PATCH] Add unit tests for distributed concurrency --- .../distributed/distributed-tests.factor | 31 +++++++++++++++++ .../distributed/distributed.factor | 33 ++++++++++++------- 2 files changed, 53 insertions(+), 11 deletions(-) create mode 100755 extra/concurrency/distributed/distributed-tests.factor diff --git a/extra/concurrency/distributed/distributed-tests.factor b/extra/concurrency/distributed/distributed-tests.factor new file mode 100755 index 0000000000..0941eb4251 --- /dev/null +++ b/extra/concurrency/distributed/distributed-tests.factor @@ -0,0 +1,31 @@ +IN: concurrency.distributed.tests +USING: tools.test concurrency.distributed kernel io.files +arrays io.sockets system combinators threads math sequences +concurrency.messaging ; + +: test-node + { + { [ unix? ] [ "distributed-concurrency-test" temp-file ] } + { [ windows? ] [ "127.0.0.1" 1238 ] } + } cond ; + +[ ] [ test-node dup 1array swap (start-node) ] unit-test + +[ ] [ yield ] unit-test + +[ ] [ + [ + receive first2 >r 3 + r> send + "thread-a" unregister-process + ] "Thread A" spawn + "thread-a" swap register-process +] unit-test + +[ 8 ] [ + 5 self 2array + "thread-a" test-node send + + receive +] unit-test + +[ ] [ test-node stop-node ] unit-test diff --git a/extra/concurrency/distributed/distributed.factor b/extra/concurrency/distributed/distributed.factor index 4c5816b6cf..c0787a96a2 100755 --- a/extra/concurrency/distributed/distributed.factor +++ b/extra/concurrency/distributed/distributed.factor @@ -2,35 +2,46 @@ ! See http://factorcode.org/license.txt for BSD license. USING: serialize sequences concurrency.messaging threads io io.server qualified arrays -namespaces kernel io.encodings.binary ; +namespaces kernel io.encodings.binary combinators.cleave +new-slots accessors ; QUALIFIED: io.sockets IN: concurrency.distributed SYMBOL: local-node : handle-node-client ( -- ) - deserialize first2 get-process send ; + deserialize + [ first2 get-process send ] + [ stop-server ] if* ; : (start-node) ( addrspecs addrspec -- ) + local-node set-global [ - local-node set-global "concurrency.distributed" - binary [ handle-node-client ] with-server - ] 2curry f spawn drop ; + binary + [ handle-node-client ] with-server + ] curry "Distributed concurrency server" spawn drop ; : start-node ( port -- ) - dup internet-server io.sockets:host-name - rot io.sockets: (start-node) ; + [ internet-server ] + [ io.sockets:host-name swap io.sockets: ] bi + (start-node) ; TUPLE: remote-process id node ; C: remote-process +: send-remote-message ( message node -- ) + binary io.sockets: + [ serialize ] with-stream ; + M: remote-process send ( message thread -- ) - { remote-process-id remote-process-node } get-slots - binary io.sockets: [ 2array serialize ] with-stream ; + [ id>> 2array ] [ node>> ] bi + send-remote-message ; M: thread (serialize) ( obj -- ) - thread-id local-node get-global - + thread-id local-node get-global (serialize) ; + +: stop-node ( node -- ) + f swap send-remote-message ;