concurrency.distributed: implement with-connection combinator

This allows to send multiple messages over the same socket connection. The
old implementation used with-client, which sent a single message and then
closed the socket. The connection stream is stored in the new remote-thread
tuple slot named connection.
factor-shell
Alexander Iljin 2018-01-10 07:21:54 +03:00 committed by Doug Coleman
parent 72ed5bf73d
commit 762268f2fe
1 changed files with 29 additions and 10 deletions

View File

@ -1,9 +1,10 @@
! Copyright (C) 2005 Chris Double. All Rights Reserved.
! See http://factorcode.org/license.txt for BSD license.
USING: serialize sequences concurrency.messaging threads io
io.servers io.encodings.binary assocs init
arrays namespaces kernel accessors ;
FROM: io.sockets => host-name <inet> with-client ;
USING: accessors arrays assocs concurrency.messaging
continuations destructors fry init io io.encodings.binary
io.servers io.sockets io.streams.duplex kernel namespaces
sequences serialize threads ;
FROM: concurrency.messaging => send ;
IN: concurrency.distributed
<PRIVATE
@ -25,8 +26,9 @@ PRIVATE>
SYMBOL: local-node
: handle-node-client ( -- )
deserialize
[ first2 get-remote-thread send ] [ stop-this-server ] if* ;
deserialize [
first2 get-remote-thread send handle-node-client
] [ stop-this-server ] if* ;
: <node-server> ( addrspec -- threaded-server )
binary <threaded-server>
@ -37,16 +39,33 @@ SYMBOL: local-node
: start-node ( addrspec -- )
<node-server> start-server local-node set-global ;
TUPLE: remote-thread node id ;
TUPLE: remote-thread node id connection ;
C: <remote-thread> remote-thread
: <remote-thread> ( node id -- remote-thread )
f remote-thread boa ;
TUPLE: connection remote stream local ;
C: <connection> connection
: connect ( remote-thread -- )
dup node>> dup binary <client> <connection> >>connection drop ;
: disconnect ( remote-thread -- )
dup connection>> [ stream>> dispose ] when* f >>connection drop ;
: with-connection ( remote-thread quot -- )
'[ connect @ ] over [ disconnect ] curry [ ] cleanup ; inline
: send-remote-message ( message node -- )
binary [ serialize ] with-client ;
: send-to-connection ( message connection -- )
stream>> [ serialize flush ] with-stream* ;
M: remote-thread send ( message thread -- )
[ id>> 2array ] [ node>> ] bi
send-remote-message ;
[ id>> 2array ] [ node>> ] [ connection>> ] tri
[ nip send-to-connection ] [ send-remote-message ] if* ;
M: thread (serialize) ( obj -- )
id>> [ local-node get insecure>> ] dip <remote-thread> (serialize) ;