Add unit tests for distributed concurrency

db4
Slava Pestov 2008-03-11 22:42:32 -05:00
parent 6bc5a174b4
commit 6e599f0191
2 changed files with 53 additions and 11 deletions

View File

@ -0,0 +1,31 @@
IN: concurrency.distributed.tests
USING: tools.test concurrency.distributed kernel io.files
arrays io.sockets system combinators threads math sequences
concurrency.messaging ;
: test-node
{
{ [ unix? ] [ "distributed-concurrency-test" temp-file <local> ] }
{ [ windows? ] [ "127.0.0.1" 1238 <inet4> ] }
} cond ;
[ ] [ test-node dup 1array swap (start-node) ] unit-test
[ ] [ yield ] unit-test
[ ] [
[
receive first2 >r 3 + r> send
"thread-a" unregister-process
] "Thread A" spawn
"thread-a" swap register-process
] unit-test
[ 8 ] [
5 self 2array
"thread-a" test-node <remote-process> send
receive
] unit-test
[ ] [ test-node stop-node ] unit-test

View File

@ -2,35 +2,46 @@
! See http://factorcode.org/license.txt for BSD license. ! See http://factorcode.org/license.txt for BSD license.
USING: serialize sequences concurrency.messaging USING: serialize sequences concurrency.messaging
threads io io.server qualified arrays threads io io.server qualified arrays
namespaces kernel io.encodings.binary ; namespaces kernel io.encodings.binary combinators.cleave
new-slots accessors ;
QUALIFIED: io.sockets QUALIFIED: io.sockets
IN: concurrency.distributed IN: concurrency.distributed
SYMBOL: local-node SYMBOL: local-node
: handle-node-client ( -- ) : handle-node-client ( -- )
deserialize first2 get-process send ; deserialize
[ first2 get-process send ]
[ stop-server ] if* ;
: (start-node) ( addrspecs addrspec -- ) : (start-node) ( addrspecs addrspec -- )
local-node set-global
[ [
local-node set-global
"concurrency.distributed" "concurrency.distributed"
binary [ handle-node-client ] with-server binary
] 2curry f spawn drop ; [ handle-node-client ] with-server
] curry "Distributed concurrency server" spawn drop ;
: start-node ( port -- ) : start-node ( port -- )
dup internet-server io.sockets:host-name [ internet-server ]
rot io.sockets:<inet> (start-node) ; [ io.sockets:host-name swap io.sockets:<inet> ] bi
(start-node) ;
TUPLE: remote-process id node ; TUPLE: remote-process id node ;
C: <remote-process> remote-process C: <remote-process> remote-process
: send-remote-message ( message node -- )
binary io.sockets:<client>
[ serialize ] with-stream ;
M: remote-process send ( message thread -- ) M: remote-process send ( message thread -- )
{ remote-process-id remote-process-node } get-slots [ id>> 2array ] [ node>> ] bi
binary io.sockets:<client> [ 2array serialize ] with-stream ; send-remote-message ;
M: thread (serialize) ( obj -- ) M: thread (serialize) ( obj -- )
thread-id local-node get-global thread-id local-node get-global <remote-process>
<remote-process>
(serialize) ; (serialize) ;
: stop-node ( node -- )
f swap send-remote-message ;