Refactoring and additions to the concurrency library.

cvs
Chris Double 2005-08-07 02:10:32 +00:00
parent a7496b5742
commit 7aaacb19dd
5 changed files with 893 additions and 408 deletions

View File

@ -23,7 +23,7 @@
! !
! Examples of using the concurrency library. ! Examples of using the concurrency library.
IN: concurrency-examples IN: concurrency-examples
USING: concurrency kernel io lists threads math sequences namespaces unparser prettyprint errors ; USING: concurrency kernel io lists threads math sequences namespaces unparser prettyprint errors dlists ;
: (logger) ( mailbox -- ) : (logger) ( mailbox -- )
#! Using the given mailbox, start a thread which #! Using the given mailbox, start a thread which
@ -35,203 +35,151 @@ USING: concurrency kernel io lists threads math sequences namespaces unparser pr
#! console that are put in the returned mailbox. #! console that are put in the returned mailbox.
make-mailbox dup [ (logger) ] cons in-thread ; make-mailbox dup [ (logger) ] cons in-thread ;
: pong-server ( -- server ) : (pong-server0) ( -- )
#! A server that responds to a 'ping' message receive uncons "ping" = [
#! by sending a 'pong' message to the caller. "pong" swap send (pong-server0)
[ ] [
[ "Pong server shutting down" swap send
[ message? [ [ "ping" = ] [ drop "pong" ] send-reply ] ] ] ifte ;
[ message? [ [ "shutdown" = ] [ drop "shutdown" ] send-reply ] ]
[ message? [ message-data "shutdown" = [ exit-server ] when ] ] : pong-server0 ( -- process)
] recv [ (pong-server0) ] spawn ;
] spawn-server ;
: rpc-server ( -- server ) TUPLE: ping-message from ;
#! Process RPC requests where the message data TUPLE: shutdown-message from ;
#! is a list. The first item of the list is the function
#! to execute. The remainder of the list are the arguments
#! to that function.
[
[
[ message? [ [ car "add" = ] [ cdr 0 [ + ] reduce ] send-reply ] ]
[ message? [ [ car "product" = ] [ cdr 1 [ * ] reduce ] send-reply ] ]
[ message? [ [ car "shutdown" = ] [ drop "shutdown" ] send-reply ] ]
[ message? [ message-data car "shutdown" = [ exit-server ] when ] ]
] recv
] spawn-server ;
: original ( -- server ) GENERIC: handle-message
#! A server that responds to a clone request. This will
#! send back to the caller a continuation that when called
#! will effectively be a clone of the original server.
[
"original waiting for message: " write self get process-pid print
[
[ message? [ [ "clone" = ] [ drop server-cc ] maybe-send-reply ] ]
[ message? [ [ "shutdown" = ] [ drop "shutdown" ] send-reply ] ]
[ message? [ message-data "shutdown" = [ exit-server ] when ] ]
] recv
] spawn-server ;
: do-clone ( process -- ) M: ping-message handle-message ( message -- bool )
#! Given a server that responder to the 'clone' message, request ping-message-from "pong" swap send t ;
#! a clone and execute it.
[ "clone" swap send-message call-server-cc ] cons spawn ;
TUPLE: update k ; M: shutdown-message handle-message ( message -- bool )
shutdown-message-from "Pong server shutdown commenced" swap send f ;
: old-server ( -- server ) : (pong-server1) ( -- )
[ "pong-server1 waiting for message..." print
"old-server waiting for message: " write self get process-pid print receive handle-message [ (pong-server1) ] when ;
[
[ message? [ [ "clone" = ] [ drop server-cc ] maybe-send-reply ] ]
[ message? [ [ "ping" = ] [ drop "gnop" ] send-reply ] ]
[ update? [ update-k call-server-cc ] ]
] recv
] spawn-server ;
: new-server ( -- server ) : pong-server1 ( -- process )
[ [
"new-server waiting for message: " write self get process-pid unparse print (pong-server1)
[ "pong-server1 exiting..." print
[ message? [ [ "clone" = ] [ drop server-cc ] maybe-send-reply ] ]
[ message? [ [ "ping" = ] [ drop "pong" ] send-reply ] ]
[ update? [ update-k call-server-cc ] ]
] recv
] spawn-server ;
: test-server-replacement ( -- )
old-server
"Old Server is: " write dup process-pid print
"Old Server result from ping is: " write "ping" over send-message .
new-server
"New Server is: " write dup process-pid print
"New Server result from ping is: " write "ping" over send-message .
"Old Server result from ping is: " write "ping" pick send-message .
"Sending code update to old server..." print
"clone" over send-message <update> pick send
"Old Server is: " write dup process-pid print
"Old Server result from ping is: " write "ping" pick send-message .
2drop ;
! ***********************************
! Ignore code below...for testing
! ***********************************
: start-pong-server ( -- )
[
[
[ message? [ [ "crash" = ] [ drop 1 0 / ] send-reply ] ]
[ message? [ [ "ping" = ] [ drop "pong" ] send-reply ] ]
] recv
] forever ;
: fragile-server ( -- server)
[ start-pong-server ] spawn ;
SYMBOL: worker
: robust-server ( -- server )
[
[
[
[ start-pong-server ] spawn-link worker set
[
receive dup message? [
worker get !
] [
drop
] ifte
] forever
]
[
[
"Worker crashed, restarting: " write print
] when*
]
catch
] forever
] spawn ; ] spawn ;
SYMBOL: set-next TUPLE: echo-message from text ;
: ring-process ( next -- server ) M: echo-message handle-message ( message -- bool )
#! A process that can receive a single message, dup echo-message-text swap echo-message-from send t ;
#! an integer number. That number is decremented then
#! sent to the 'next' process. If the number is 0 it is GENERIC: handle-message2
#! relayed to the next process and this process exits. PREDICATE: tagged-message ping-message2 ( obj -- ? ) tagged-message-data "ping" = ;
PREDICATE: tagged-message shutdown-message2 ( obj -- ? ) tagged-message-data "shutdown" = ;
M: ping-message2 handle-message2 ( message -- bool )
"pong" reply t ;
M: shutdown-message2 handle-message2 ( message -- bool )
"Pong server shutdown commenced" reply f ;
: (pong-server2) ( -- )
"pong-server2 waiting for message..." print
receive handle-message2 [ (pong-server2) ] when ;
: pong-server2 ( -- process )
[
(pong-server2)
"pong-server2 exiting..." print
] spawn ;
: pong-server3 ( -- process )
[ handle-message2 ] spawn-server ;
GENERIC: handle-rpc-message
GENERIC: run-rpc-command
TUPLE: rpc-command op args ;
PREDICATE: rpc-command add-command ( msg -- bool )
rpc-command-op "add" = ;
PREDICATE: rpc-command product-command ( msg -- bool )
rpc-command-op "product" = ;
PREDICATE: rpc-command shutdown-command ( msg -- bool )
rpc-command-op "shutdown" = ;
PREDICATE: rpc-command crash-command ( msg -- bool )
rpc-command-op "crash" = ;
M: tagged-message handle-rpc-message ( message -- bool )
dup tagged-message-data run-rpc-command -rot reply not ;
M: add-command run-rpc-command ( command -- shutdown? result )
rpc-command-args sum f ;
M: product-command run-rpc-command ( command -- shutdown? result )
rpc-command-args product f ;
M: shutdown-command run-rpc-command ( command -- shutdown? result )
drop t t ;
M: crash-command run-rpc-command ( command -- shutdown? result )
drop 1 0 / f ;
: fragile-rpc-server ( -- process )
[ handle-rpc-message ] spawn-server ;
: (robust-rpc-server) ( worker -- )
[ [
[ receive over send
quit-cc set
[
receive dup process? [
"Setting next for " write self get process-pid print
nip
] [
dup 0 = [ ( next 0 -- )
"0 received for " write self get process-pid print
swap [ send ] when*
quit-cc get call
] [
dup unparse write " received for " write self get process-pid print
1 - over [ send ] when*
] ifte
] ifte
] forever
] callcc0
"Exiting process " write self get process-pid print
] cons spawn ;
: create-ring ( n -- process )
#! Create a ring of n processes, returning one
f ring-process dup rot 1 -
[
ring-process
] times over send ;
: fib ( n -- )
yield
dup 2 < [
] [ ] [
dup 1 - >r 2 - fib r> fib + [
"Worker died, Starting a new worker" print
drop [ handle-rpc-message ] spawn-linked-server
] when
] catch
(robust-rpc-server) ;
: robust-rpc-server ( -- process )
[
[ handle-rpc-message ] spawn-linked-server
(robust-rpc-server)
] spawn ;
: test-add ( process -- )
[
"add" [ 1 2 3 ] <rpc-command> swap send-synchronous .
] cons spawn drop ;
: test-crash ( process -- )
[
"crash" f <rpc-command> swap send-synchronous .
] cons spawn drop ;
! ******************************
! Experimental code below
! ******************************
USE: gadgets
USE: generic
TUPLE: promised-label promise ;
C: promised-label ( promise -- promised-label )
<gadget> over set-delegate [ set-promised-label-promise ] keep
[ [ dup promised-label-promise ?promise drop relayout ] cons spawn drop ] keep ;
: promised-label-text ( promised-label -- text )
promised-label-promise dup promise-fulfilled? [
?promise
] [
drop "Unfulfilled Promise"
] ifte ; ] ifte ;
TUPLE: fib-message number ; M: promised-label pref-dim ( promised-label - dim )
dup promised-label-text label-size ;
: fib-server ( -- server ) M: promised-label draw-gadget* ( promised-label -- )
[ dup delegate draw-gadget*
"fib-server waiting for message: " write self get process-pid unparse print dup promised-label-text draw-string ;
[
[ message? [ [ fib-message? ] [ fib-message-number fib ] send-reply ] ]
] recv
] spawn-server ;
: t1 : fib ( n -- n )
f ring-process dup ring-process over send ; yield dup 2 < [ drop 1 ] [ dup 1 - fib swap 2 - fib + ] ifte ;
: abcd : test-promise-ui ( -- )
[ <promise> dup <promised-label> gadget. [ 12 fib unparse swap fulfill ] cons spawn drop ;
"here" print
receive
"there" print
drop quit-cc call
] spawn-server ;
: pong-server1 ( -- process)
[
receive uncons "ping" = [
"pong" swap send
] [
"Pong server shutdown commenced" swap send
exit-server
] ifte
] spawn-server ;
: pong-server2 ( -- process)
[
receive
dup [ "ping" = ] [ drop "pong" ] send-reply
dup [ "shutdown" = ] [ drop "Pong server shutdown commenced" ] send-reply
message-data "shutdown" = [ exit-server ] when
] spawn-server ;

View File

@ -23,7 +23,60 @@
! !
IN: concurrency IN: concurrency
USING: kernel concurrency concurrency-examples threads vectors USING: kernel concurrency concurrency-examples threads vectors
sequences lists namespaces test errors ; sequences lists namespaces test errors dlists strings
math words ;
[ "junk" ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ string? ] swap dlist-pop?
] unit-test
[ 5 20 ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ string? ] over dlist-pop? drop
[ ] dlist-each
] unit-test
[ "junk" ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ integer? ] over dlist-pop? drop
[ integer? ] over dlist-pop? drop
[ ] dlist-each
] unit-test
[ t ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ string? ] swap dlist-pred?
] unit-test
[ t ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ integer? ] swap dlist-pred?
] unit-test
[ f ] [
<dlist>
5 over dlist-push-end
"junk" over dlist-push-end
20 over dlist-push-end
[ string? ] over dlist-pop? drop
[ string? ] swap dlist-pred?
] unit-test
[ { 1 2 3 } ] [ [ { 1 2 3 } ] [
0 <vector> 0 <vector>
@ -36,53 +89,74 @@ USING: kernel concurrency concurrency-examples threads vectors
3 swap mailbox-put 3 swap mailbox-put
] unit-test ] unit-test
[ { 1 2 3 } ] [
0 <vector>
make-mailbox
2dup [ [ integer? ] swap mailbox-get? swap push ] cons cons in-thread
2dup [ [ integer? ] swap mailbox-get? swap push ] cons cons in-thread
2dup [ [ integer? ] swap mailbox-get? swap push ] cons cons in-thread
1 over mailbox-put
2 over mailbox-put
3 swap mailbox-put
] unit-test
[ { 1 "junk" 3 "junk2" } [ 456 ] ] [
0 <vector>
make-mailbox
2dup [ [ integer? ] swap mailbox-get? swap push ] cons cons in-thread
2dup [ [ integer? ] swap mailbox-get? swap push ] cons cons in-thread
2dup [ [ string? ] swap mailbox-get? swap push ] cons cons in-thread
2dup [ [ string? ] swap mailbox-get? swap push ] cons cons in-thread
1 over mailbox-put
"junk" over mailbox-put
[ 456 ] over mailbox-put
3 over mailbox-put
"junk2" over mailbox-put
mailbox-get
] unit-test
[ f ] [ 1 2 gensym <tagged-message> gensym tag-match? ] unit-test
[ f ] [ "junk" gensym tag-match? ] unit-test
[ t ] [ 1 2 gensym <tagged-message> dup tagged-message-tag tag-match? ] unit-test
[ "test" ] [ [ "test" ] [
[ self get ] "test" with-process [ self ] "test" with-process
] unit-test ] unit-test
[ "received" ] [ [ "received" ] [
[ [
[ receive dup tagged-message? [
[ message? [ [ drop ] [ "received" ] send-reply ] ] "received" reply
] recv ] [
drop f
] ifte
] spawn ] spawn
"sent" swap send-message "sent" swap send-synchronous
] unit-test ] unit-test
[ "pong" "shutdown" ] [ [ 1 3 2 ] [
pong-server "ping" over send-message 1 self send
swap "shutdown" swap send-message 2 self send
3 self send
receive
[ 2 mod 0 = not ] receive-if
receive
] unit-test ] unit-test
[ "shutdown" 20 6 ] [ [ "pong" "Pong server shutdown commenced" ] [
rpc-server pong-server3 "ping" over send-synchronous
[ "add" 1 2 3 ] over send-message >r swap "shutdown" swap send-synchronous
[ "product" 4 5 ] over send-message >r
[ "shutdown" ] swap send-message
r> r>
] unit-test ] unit-test
[ "pong" "gnop" "pong" "gnop" ] [ [ t 60 120 ] [
old-server "ping" over send-message >r fragile-rpc-server
new-server "ping" over send-message >r << rpc-command f "product" [ 4 5 6 ] >> over send-synchronous >r
"ping" pick send-message >r << rpc-command f "add" [ 10 20 30 ] >> over send-synchronous >r
"clone" over send-message <update> pick send << rpc-command f "shutdown" [ ] >> swap send-synchronous
"ping" pick send-message >r r> r>
3drop ] unit-test
r> r> r> r>
] unit-test
[ f ] [
[
[
"crash" throw
] spawn drop
]
[
] catch
] unit-test
[ "crash" ] [ [ "crash" ] [
[ [
[ [
@ -93,6 +167,16 @@ USING: kernel concurrency concurrency-examples threads vectors
[ [
] catch ] catch
] unit-test ] unit-test
[ 55 ] [ [ 10 fib ] future ?future ] unit-test [ 50 ] [
[ 5 ] [ [ 5 fib ] lazy ?lazy ] unit-test [ 50 ] future ?future
] unit-test
[ { 50 50 50 } ] [
0 <vector>
<promise>
2dup [ ?promise swap push ] cons cons spawn drop
2dup [ ?promise swap push ] cons cons spawn drop
2dup [ ?promise swap push ] cons cons spawn drop
50 swap fulfill
] unit-test

View File

@ -24,12 +24,49 @@
! Concurrency library for Factor based on Erlang/Termite style ! Concurrency library for Factor based on Erlang/Termite style
! concurrency. ! concurrency.
USING: kernel lists generic threads io namespaces errors words USING: kernel lists generic threads io namespaces errors words
math sequences hashtables unparser strings vectors ; math sequences hashtables unparser strings vectors dlists ;
IN: concurrency IN: concurrency
#! Debug #! Debug
USE: prettyprint USE: prettyprint
: (dlist-pop?) ( dlist pred dnode -- obj | f )
[
[ dlist-node-data swap call ] 2keep rot [
swapd [ (dlist-unlink) ] keep dlist-node-data nip
] [
dlist-node-next (dlist-pop?)
] ifte
] [
2drop f
] ifte* ;
: dlist-pop? ( pred dlist -- obj | f )
#! Return first item in the dlist that when passed to the
#! predicate quotation, true is left on the stack. The
#! item is removed from the dlist. The 'pred' quotation
#! must have stack effect ( obj -- bool ).
#! TODO: needs a better name and should be moved to dlists.
dup dlist-first swapd (dlist-pop?) ;
: (dlist-pred?) ( pred dnode -- bool )
[
[ dlist-node-data swap call ] 2keep rot [
2drop t
] [
dlist-node-next (dlist-pred?)
] ifte
] [
drop f
] ifte* ;
: dlist-pred? ( pred dlist -- obj | f )
#! Return true if any item in the dlist that when passed to the
#! predicate quotation, true is left on the stack.
#! The 'pred' quotation must have stack effect ( obj -- bool ).
#! TODO: needs a better name and should be moved to dlists.
dlist-first (dlist-pred?) ;
TUPLE: mailbox threads data ; TUPLE: mailbox threads data ;
: make-mailbox ( -- mailbox ) : make-mailbox ( -- mailbox )
@ -40,26 +77,31 @@ TUPLE: mailbox threads data ;
#! something in the mailbox. If multiple threads are waiting on the #! something in the mailbox. If multiple threads are waiting on the
#! same mailbox, only one of the waiting threads will be unblocked #! same mailbox, only one of the waiting threads will be unblocked
#! to process the get operation. #! to process the get operation.
0 <vector> <queue> <mailbox> ; 0 <vector> <dlist> <mailbox> ;
: mailbox-empty? ( mailbox -- bool ) : mailbox-empty? ( mailbox -- bool )
#! Return true if the mailbox is empty #! Return true if the mailbox is empty
mailbox-data queue-empty? ; mailbox-data dlist-empty? ;
: mailbox-put ( obj mailbox -- ) : mailbox-put ( obj mailbox -- )
#! Put the object into the mailbox. If the mailbox #! Put the object into the mailbox. Any threads that have
#! is empty and a thread has a blocking get on it #! a blocking get on the mailbox are resumed.
#! then that thread is resumed. If more than one thread [ mailbox-data dlist-push-end ] keep
#! is waiting, then only one of those threads will be [ mailbox-threads ] keep 0 <vector> swap set-mailbox-threads
#! resumed. [ schedule-thread ] each yield ;
dup mailbox-empty? -rot
swap over mailbox-data enque over set-mailbox-data swap [
dup mailbox-threads 0 <vector> rot set-mailbox-threads [
[ schedule-thread ] each yield
] when*
] when ;
: (mailbox-block-if-empty) ( mailbox -- obj ) : (mailbox-block-unless-pred) ( pred mailbox -- pred mailbox )
#! Block the thread if there are not items in the mailbox
#! that return true when the predicate is called with the item
#! on the stack. The predicate must have stack effect ( X -- bool ).
dup mailbox-data pick swap dlist-pred? [
[
swap mailbox-threads push stop
] callcc0
(mailbox-block-unless-pred)
] unless ;
: (mailbox-block-if-empty) ( mailbox -- mailbox )
#! Block the thread if the mailbox is empty #! Block the thread if the mailbox is empty
dup mailbox-empty? [ dup mailbox-empty? [
[ [
@ -73,8 +115,15 @@ TUPLE: mailbox threads data ;
#! empty the thread blocks until an item is put into it. #! empty the thread blocks until an item is put into it.
#! The thread then resumes, leaving the item on the stack. #! The thread then resumes, leaving the item on the stack.
(mailbox-block-if-empty) (mailbox-block-if-empty)
dup mailbox-data deque rot set-mailbox-data ; mailbox-data dlist-pop-front ;
: mailbox-get? ( pred mailbox -- obj )
#! Get the first item in the mailbox which satisfies the predicate.
#! 'pred' will be called with each item on the stack. When pred returns
#! true that item will be returned. If nothing in the mailbox
#! satisfies the predicate then the thread will block until something does.
(mailbox-block-unless-pred)
mailbox-data dlist-pop? ;
#! Processes run on nodes identified by a hostname and port. #! Processes run on nodes identified by a hostname and port.
TUPLE: node hostname port ; TUPLE: node hostname port ;
@ -100,12 +149,17 @@ TUPLE: process node links pid mailbox ;
#! that process terminates. #! that process terminates.
localnode swap unit gensym unparse make-mailbox <process> ; localnode swap unit gensym unparse make-mailbox <process> ;
#! The 'self' variable returns the currently executing process. #! The 'self-process' variable holds the currently executing process.
SYMBOL: self SYMBOL: self-process
: self ( -- process )
#! Returns the contents of the 'self-process' variables which
#! is the process object for the current process.
self-process get ;
: init-main-process ( -- ) : init-main-process ( -- )
#! Setup the main process. #! Setup the main process.
make-process self set ; make-process self-process set ;
init-main-process init-main-process
@ -113,13 +167,12 @@ init-main-process
#! Calls the quotation with 'self' set #! Calls the quotation with 'self' set
#! to the given process. #! to the given process.
<namespace> [ <namespace> [
self set self-process set
] extend ] extend
swap bind ; swap bind ;
: spawn ( quot -- process ) : spawn ( quot -- process )
#! Start a process which runs the given quotation. #! Start a process which runs the given quotation.
[ [ drop ] catch ] cons
[ in-thread ] make-process [ with-process ] over slip ; [ in-thread ] make-process [ with-process ] over slip ;
TUPLE: linked-exception error ; TUPLE: linked-exception error ;
@ -133,32 +186,43 @@ TUPLE: linked-exception error ;
#! Return a message from the current processes mailbox. #! Return a message from the current processes mailbox.
#! If the box is empty, suspend the process until something #! If the box is empty, suspend the process until something
#! is placed in the box. #! is placed in the box.
self get process-mailbox mailbox-get dup linked-exception? [ self process-mailbox mailbox-get dup linked-exception? [
linked-exception-error throw linked-exception-error throw
] when ; ] when ;
: receive-if ( pred -- message )
#! Return the first message frmo the current processes mailbox
#! that satisfies the predicate. To satisfy the predicate, 'pred'
#! is called with the item on the stack and the predicate should leave
#! a boolean indicating whether it was satisfied or not. The predicate
#! must have stack effect ( X -- bool ). If nothing in the mailbox
#! satisfies the predicate then the process will block until something does.
self process-mailbox mailbox-get? dup linked-exception? [
linked-exception-error throw
] when ;
: rethrow-linked ( error -- ) : rethrow-linked ( error -- )
#! Rethrow the error to the linked process #! Rethrow the error to the linked process
self get process-links [ over <linked-exception> swap send ] each drop ; self process-links [ over <linked-exception> swap send ] each drop ;
: spawn-link ( quot -- process ) : spawn-link ( quot -- process )
#! Same as spawn but if the quotation throws an error that #! Same as spawn but if the quotation throws an error that
#! is uncaught, that error gets propogated to the process #! is uncaught, that error gets propogated to the process
#! performing the spawn-link. #! performing the spawn-link.
[ [ [ rethrow-linked ] when* ] catch ] cons [ [ [ rethrow-linked ] when* ] catch ] cons
[ in-thread ] self get make-linked-process [ with-process ] over slip ; [ in-thread ] self make-linked-process [ with-process ] over slip ;
#! A common operation is to send a message to a process containing #! A common operation is to send a message to a process containing
#! the sending process so the receiver can send a reply back. A 'tag' #! the sending process so the receiver can send a reply back. A 'tag'
#! is also sent so that the sender can match the reply with the #! is also sent so that the sender can match the reply with the
#! original request. The 'message' tuple ecapsulates this. #! original request. The 'tagged-message' tuple ecapsulates this.
TUPLE: message data from tag ; TUPLE: tagged-message data from tag ;
: >message< ( message -- data from tag ) : >tagged-message< ( tagged-message -- data from tag )
#! Explode a message tuple. #! Explode a message tuple.
dup message-data swap dup tagged-message-data swap
dup message-from swap dup tagged-message-from swap
message-tag ; tagged-message-tag ;
: (recv) ( msg form -- ) : (recv) ( msg form -- )
#! Process a form with the following format: #! Process a form with the following format:
@ -191,23 +255,34 @@ TUPLE: message data from tag ;
#! may be run against the message. #! may be run against the message.
receive swap [ dupd (recv) ] each drop ; receive swap [ dupd (recv) ] each drop ;
: send-message ( data process -- reply ) : tag-message ( message -- tagged-message )
#! Given a message, wrap it with a tagged message.
self gensym <tagged-message> ;
: tag-match? ( message tag -- bool )
#! Return true if the message is a tagged message and
#! its tag matches the given tag.
swap dup tagged-message? [
tagged-message-tag =
] [
2drop f
] ifte ;
: send-synchronous ( message process -- reply )
#! Sends a message to the process using the 'message' #! Sends a message to the process using the 'message'
#! protocol and waits for a reply to that message. The reply #! protocol and waits for a reply to that message. The reply
#! is matched up with the request by generating a message tag #! is matched up with the request by generating a message tag
#! which should be sent back with the reply. #! which should be sent back with the reply.
swap self get gensym dup >r <message> >r tag-message [ tagged-message-tag ] keep r> send
swap send unit [ car tag-match? ] cons receive-if tagged-message-data ;
r> receive
dup message? [ : reply ( tagged-message message -- )
dup message-tag rot = [ #! Replies to the tagged-message which should have been a result of a
message-data #! 'send-synchronous' call. It will send 'message' back to the process
] [ #! that originally sent the tagged message, and will have the same tag
2drop f #! as that in 'tagged-message'.
] ifte swap >tagged-message< rot drop ( message from tag )
] [ swap >r >r self r> <tagged-message> r> send ;
2drop f
] ifte ;
: forever ( quot -- ) : forever ( quot -- )
#! Loops forever executing the quotation. #! Loops forever executing the quotation.
@ -215,32 +290,32 @@ TUPLE: message data from tag ;
SYMBOL: quit-cc SYMBOL: quit-cc
: (spawn-server) ( quot -- )
#! Receive a message, and run 'quot' on it. If 'quot'
#! returns true, start again, otherwise exit loop.
#! The quotation should have stack effect ( message -- bool ).
"Waiting for message in server: " write self process-pid print
receive over call [ (spawn-server) ] when ;
: spawn-server ( quot -- process ) : spawn-server ( quot -- process )
#! Spawn a server that runs the quotation in #! Spawn a server that receives messages, calling the
#! a loop. A continuation in the variable 'quit-cc' is available #! quotation on the message. If the quotation returns false
#! that when called will exit the loop. #! the spawned process exits. If it returns true, the process
#! starts from the beginning again. The quotation should have
#! stack effect ( message -- bool ).
[ [
[ (spawn-server)
quit-cc set "Exiting process: " write self process-pid print
forever
] callcc0
"Exiting process: " write self get process-pid print
] cons spawn ; ] cons spawn ;
: spawn-linked-server ( quot -- process ) : spawn-linked-server ( quot -- process )
#! Spawn a linked server that runs forever. #! Similar to 'spawn-server' but the parent process will be linked
#! to the child.
[ [
[ (spawn-server)
quit-cc set "Exiting process: " write self process-pid print
forever
] callcc0
"Exiting process: " write self get process-pid print
] cons spawn-link ; ] cons spawn-link ;
: exit-server ( -- )
#! Calls the quit continuation to exit a server.
quit-cc get call ;
: send-reply ( message pred quot -- ) : send-reply ( message pred quot -- )
#! The intent of this word is to provde an easy way to #! The intent of this word is to provde an easy way to
#! check the data contained in a message, process it, and #! check the data contained in a message, process it, and
@ -254,28 +329,26 @@ SYMBOL: quit-cc
#! The result of that call will be sent back to the #! The result of that call will be sent back to the
#! messages original caller with the same tag as the #! messages original caller with the same tag as the
#! original message. #! original message.
>r >r >message< rot ( from tag data r: quot pred ) >r >r >tagged-message< rot ( from tag data r: quot pred )
dup r> call [ ( from tag data r: quot ) dup r> call [ ( from tag data r: quot )
r> call ( from tag result ) r> call ( from tag result )
self get ( from tag result self ) self ( from tag result self )
rot ( from self tag result ) rot ( from self tag result )
<message> swap send <tagged-message> swap send
] [ ] [
r> drop 3drop r> drop 3drop
] ifte ; ] ifte ;
SYMBOL: exit
: maybe-send-reply ( message pred quot -- ) : maybe-send-reply ( message pred quot -- )
#! Same as !result but if false is returned from #! Same as !result but if false is returned from
#! quot then nothing is sent back to the caller. #! quot then nothing is sent back to the caller.
>r >r >message< rot ( from tag data r: quot pred ) >r >r >tagged-message< rot ( from tag data r: quot pred )
dup r> call [ ( from tag data r: quot ) dup r> call [ ( from tag data r: quot )
r> call ( from tag result ) r> call ( from tag result )
[ [
self get ( from tag result self ) self ( from tag result self )
rot ( from self tag result ) rot ( from self tag result )
<message> swap send <tagged-message> swap send
] [ ] [
2drop 2drop
] ifte* ] ifte*
@ -291,12 +364,12 @@ SYMBOL: exit
#! and jumping back into it from a spawn and keeping the 'self' #! and jumping back into it from a spawn and keeping the 'self'
#! variable correct. It's a workaround until I can find out how to #! variable correct. It's a workaround until I can find out how to
#! stop 'self' from being clobbered back to its old value. #! stop 'self' from being clobbered back to its old value.
[ ] callcc1 dup process? [ self set f ] when ; [ ] callcc1 dup process? [ self-process set f ] when ;
: call-server-cc ( server-cc -- ) : call-server-cc ( server-cc -- )
#! Calls the server continuation passing the current 'self' #! Calls the server continuation passing the current 'self'
#! so the server continuation gets its new self updated. #! so the server continuation gets its new self updated.
self get swap call ; self swap call ;
: future ( quot -- future ) : future ( quot -- future )
#! Spawn a process to call the quotation and immediately return #! Spawn a process to call the quotation and immediately return
@ -304,13 +377,44 @@ SYMBOL: exit
#! ?future. If the quotation has completed the result will be returned. #! ?future. If the quotation has completed the result will be returned.
#! If not, the process will block until the quotation completes. #! If not, the process will block until the quotation completes.
#! 'quot' must have stack effect ( -- X ). #! 'quot' must have stack effect ( -- X ).
[ call self get send ] cons spawn ; [ call self send ] cons spawn ;
: ?future ( future -- result ) : ?future ( future -- result )
#! Block the process until the future has completed and then place the #! Block the process until the future has completed and then place the
#! result on the stack. Return the result immediately if the future has completed. #! result on the stack. Return the result immediately if the future has completed.
process-mailbox mailbox-get ; process-mailbox mailbox-get ;
TUPLE: promise fulfilled? value processes ;
C: promise ( -- <promise> )
[ 0 <vector> swap set-promise-processes ] keep ;
: fulfill ( value promise -- )
#! Set the future of the promise to the given value. Threads
#! blocking on the promise will then be released.
dup promise-fulfilled? [
[ set-promise-value ] keep
[ t swap set-promise-fulfilled? ] keep
[ promise-processes ] keep 0 <vector> swap set-promise-processes
[ schedule-thread ] each yield
] unless ;
: (maybe-block-promise) ( promise -- promise )
#! Block the process if the promise is unfulfilled. This is different from
#! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads
#! need to be resumed, rather than just one.
dup promise-fulfilled? [
[
swap promise-processes push stop
] callcc0
] unless ;
: ?promise ( promise -- result )
(maybe-block-promise) promise-value ;
! ******************************
! Experimental code below
! ******************************
SYMBOL: lazy-quot SYMBOL: lazy-quot
: lazy ( quot -- lazy ) : lazy ( quot -- lazy )
@ -321,12 +425,12 @@ SYMBOL: lazy-quot
[ [
lazy-quot set lazy-quot set
[ [
[ message? [ [ drop t ] [ get call ] send-reply ] ] [ tagged-message? [ [ drop t ] [ get call ] send-reply ] ]
] recv ] recv
] with-scope ] with-scope
] cons spawn ; ] cons spawn ;
: ?lazy ( lazy -- result ) : ?lazy ( lazy -- result )
#! Given a process spawned using 'lazy', evaluate it and return the result. #! Given a process spawned using 'lazy', evaluate it and return the result.
lazy-quot swap send-message ; lazy-quot swap send-synchronous ;

View File

@ -18,6 +18,15 @@ communicate with each other by asynchronous message sends. Although
processes can share data via Factor's mutable data structures it is processes can share data via Factor's mutable data structures it is
not recommended as the use of shared state concurrency is often a not recommended as the use of shared state concurrency is often a
cause of problems.</p> cause of problems.</p>
<h1>Loading</h1>
<p>The quickest way to get up and running with this library is to
change to the 'concurrency' directory and run Factor. Then execute the
following commands:</p>
<pre class="code">
"load.factor" run-file
USE: concurrency
USE: concurrency-examples
</pre>
<h1>Processes</h1> <h1>Processes</h1>
<p>A process is basically a thread with a message queue. Other <p>A process is basically a thread with a message queue. Other
processes can place items on this queue by sending the process a processes can place items on this queue by sending the process a
@ -30,7 +39,7 @@ hundreds of thousands of simple processes.</p>
value. Factor tuples are ideal for this sort of thing as you can send value. Factor tuples are ideal for this sort of thing as you can send
a tuple to a process and the predicate dispatch mechanism can be used a tuple to a process and the predicate dispatch mechanism can be used
to perform actions depending on what the type of the tuple is.</p> to perform actions depending on what the type of the tuple is.</p>
<p>Processes are usually created using the spawn' word:</p> <p>Processes are usually created using the 'spawn' word:</p>
<pre class="code"> <pre class="code">
IN: concurrency IN: concurrency
spawn ( quot -- process ) spawn ( quot -- process )
@ -45,11 +54,11 @@ IN: concurrency
send ( message process -- ) send ( message process -- )
</pre> </pre>
<p>'send' will return immediately after placing the message in the <p>'send' will return immediately after placing the message in the
target processes message queue. A process can get an message from its target processes message queue. A process can get a message from its
queue using the 'receive' word:</p> queue using the 'receive' word:</p>
<pre class="code"> <pre class="code">
IN: concurrency IN: concurrency
: receive ( -- message ) receive ( -- message )
</pre> </pre>
<p>This will get the most recent message <p>This will get the most recent message
and leave it on the stack. If there are no messages in the queue the and leave it on the stack. If there are no messages in the queue the
@ -62,22 +71,40 @@ blocked it takes no CPU time at all.</p>
<p>This example spawns a process that first blocks, waiting to receive <p>This example spawns a process that first blocks, waiting to receive
a message. When a message is received, the 'receive' call returns a message. When a message is received, the 'receive' call returns
leaving it on the stack. It then prints the message and exits. 'spawn' leaving it on the stack. It then prints the message and exits. 'spawn'
left the process on the stack its available to send the 'Hello left the process on the stack so it's available to send the 'Hello
Process!' message to it. Immediately after the 'send' you should see Process!' message to it. Immediately after the 'send' you should see
'Hello Process!' printer on the console.</p> 'Hello Process!' printed on the console.</p>
<p>It is also possible to selectively retrieve messages from the
message queue. The 'receive-if' word takes a predicate quotation on the stack
and returns the first message in the queue that satisfies the
predicate. If no items satisfy the predicate then the process is
blocked until a message is received that does.
</p>
<pre class="code">
: odd? ( n -- ? )
2 mod 1 = ;
<span class="highlite">1 self send
2 self send
3 self send</span>
<span class="highlite">receive .</span>
=> 1
<span class="highlite">[ odd? ] receive-if .</span>
=> 3
<span class="highlite">receive .</span>
=> 2
</pre>
<h2>Self</h2> <h2>Self</h2>
<p>A process can get access to its own process object using the 'self' <p>A process can get access to its own process object using the 'self'
variable so it can pass word so it can pass it to other processes. This allows the other processes to send
it to other processes. This allows the other processes to send
messages back. A simple example of using this gets the current messages back. A simple example of using this gets the current
processes 'self' and spawns a process which sends a message to it. We processes 'self' and spawns a process which sends a message to it. We
then receive the message from the original process</p> then receive the message from the original process</p>
<pre class="code"> <pre class="code">
self get <span class="highlite">self .s</span>
.s => &lt;&lt; process ... >>
=> << process ... >> <span class="highlite">[ "Hello!" swap send ] cons spawn drop receive .</span>
[ "Hello!" swap send ] cons spawn drop
receive .
=> "Hello" => "Hello"
</pre> </pre>
<h1>Servers</h1> <h1>Servers</h1>
@ -85,115 +112,436 @@ then receive the message from the original process</p>
that are sent to it. These follow a basic pattern of blocking until a that are sent to it. These follow a basic pattern of blocking until a
message is received, processing that message then looping back to message is received, processing that message then looping back to
blocking for a message.</p> blocking for a message.</p>
<p>The 'spawn-server' word does exactly that:</p>
<pre class="code">
IN: concurrency
spawn-server ( quot -- process )
</pre>
<p>A process is spawned in the same manner as 'spawn', but instead of
the process existing then the quotation completes, the quotation is
re-called. A process spawned using this method can break out of the
infinite loop and exit the process using the 'exit-server' call:</p>
<pre class="code">
IN: concurrency
: exit-server ( -- )
</pre>
<p>The following example shows a very simple server that expects a <p>The following example shows a very simple server that expects a
cons cell as its message. The 'car' of the list should be the senders cons cell as its message. The 'car' of the cons should be the senders
process object. If the 'cdr' is 'ping' then the server sends 'pong' process object. If the 'cdr' is 'ping' then the server sends 'pong'
back to the caller. If the 'cdr' is anything else then the server back to the caller. If the 'cdr' is anything else then the server
exits:</p> exits:</p>
<pre class="code"> <pre class="code">
: pong-server1 ( -- process) : (pong-server0) ( -- )
[ receive uncons "ping" = [
receive uncons "ping" = [ "pong" swap send (pong-server0)
"pong" swap send ] [
] [ "Pong server shutting down" swap send
"Pong server shutdown commenced" swap send ] ifte ;
exit-server
] ifte : pong-server0 ( -- process)
] spawn-server ; [ (pong-server0) ] spawn ;
pong-server1 <span class="highlite">pong-server0</span>
self get "ping" cons over send receive . <span class="highlite">self "ping" cons over send receive .</span>
=> "pong" => "pong"
self get "ping" cons over send receive . <span class="highlite">self "ping" cons over send receive .</span>
=> "pong" => "pong"
self get "shutdown" cons over send receive . <span class="highlite">self "shutdown" cons over send receive .</span>
=> "Pong server shutdown commenced" => "Pong server shutting down"
Exiting process: G:12361
</pre> </pre>
<p>The idiom of sending the callers process object along with the <p>Handling the deconstructing of messages and dispatching based on
message is so common that some standard routines are built into the the message can be a bit of a chore. Especially in servers that take a
concurrency library to handle this. A tuple called 'message' is used number of different messages. One approach to factor this code out,
as the standard message sent to processes that wish to acknowledge and reduce the amount of stack juggling required, is to use tuples as
receipt of the messaeg with a reply back to the caller:</p> messages. This allows using the generic dispatch mechanism. The
following example implements the pong server but using tuples as
messages:</p>
<pre class="code"> <pre class="code">
IN: concurrency TUPLE: ping-message from ;
TUPLE: message data from tag ; TUPLE: shutdown-message from ;
</pre>
<p>The 'data' contains the actual message data to be sent to the GENERIC: handle-message
server. 'from' is the process object of the caller. 'tag' is an
automatically generated unique value that the receving server will M: ping-message handle-message ( message -- bool )
send back along with the reply so the caller can match it up with the ping-message-from "pong" swap send t ;
original request. A 'send-reply' word is available that has the
following signature:</p> M: shutdown-message handle-message ( message -- bool )
<pre class="code"> shutdown-message-from "Pong server shutdown commenced" swap send f ;
IN: concurrency
send-reply ( message pred quot -- ) : (pong-server1) ( -- )
</pre> "pong-server1 waiting for message..." print
<p>The 'message' is a message tuple. 'pred' is a quotation with the receive handle-message [ (pong-server1) ] when ;
signature ( data -- boolean ). It will be called with the message-data
portion of the message. If it returns false, all three arguments are : pong-server1 ( -- process )
popped off the stack and nothing is done.</p> [
<p>If the predicate returns true, then the quotation is called with (pong-server1)
the message data on the stack again. This quotation has the signature "pong-server1 exiting..." print
( data -- result ). The result of the quotation will be sent back to ] spawn ;
the callinging process in a message tuple, with the same tag as the </pre>
original message and the message data will be the result.</p> <p>Two tuples are created for a 'ping' and 'shutdown' message. Each
<p>To make it easier to send a message tuple without having to has a 'from' slot which holds the process of the sender. The server
generate a tag, get the 'self' process, etc, the 'send-message' word loop, in '(pong-server1)', calls a generic method called
is available:</p> 'handle-message'. This has signature ( message -- bool ). These
<pre class="code"> methods return a boolean.
IN: concurrency True means continue the server
send-message ( data process -- reply ) loop. False means exit and shut down the server.</p>
</pre> <p>Two methods are added to the generic word. One for 'ping' and the
<p>Given the message data it will construct a message tuple with a other for 'pong'. Here's a sample run:</p>
randomly generated tag and send it to the given process. It will then <pre class="code"> clear
wait for a reply containing that specific tag and take the message <span class="highlite">pong-server1</span>
data from it, leaving it on the stack.</p> => pong-server1 waiting for message...
<p>Using these words our pong server example becomes:</p> <span class="highlite">self &lt;ping-message> over send receive .</span>
<pre class="code"> => "pong"
: pong-server2 ( -- process) pong-server1 waiting for message...
[ <span class="highlite">self &lt;ping-message> over send receive .</span>
receive => "pong"
dup [ "ping" = ] [ drop "pong" ] send-reply pong-server1 waiting for message...
dup [ "shutdown" = ] [ drop "Pong server shutdown commenced" ] send-reply <span class="highlite">self &lt;shutdown-message> over send receive .</span>
message-data "shutdown" = [ exit-server ] when => "Pong server shutdown commenced"
] spawn-server ; pong-server1 exiting...
</pre>
<p>The advantage of this approach is it is easy to extend the server
without shutting it down. Adding a new message is as simple as
defining the tuple and adding a method to 'handle-message' specialised
on that tuple. Here's an example of adding an 'echo' message, without
shutting the server down:</p>
<pre class="code">
<span class="highlite">pong-server1</span>
=> pong-server1 waiting for message...
<span class="highlite">self &lt;ping-message> over send receive .</span>
=> "pong"
TUPLE: echo-message from text ;
M: echo-message handle-message ( message -- bool )
dup echo-message-text swap echo-message-from send t ;
<span class="highlite">self "Hello World" &lt;echo-message> over send receive .</span>
=>"Hello World"
</pre>
<h2>Synchronous Sends</h2>
<p>The 'send' word sends a message asynchronously, and the sending
process continues immediately. The 'pong server' examples shown
previously all sent messages to the server and waited for a reply back
from the server. This pattern of synchronous sending is made easier
with the 'send-synchronous' word:</p>
<pre class="code">
IN: concurrency
send-synchronous ( message process -- reply )
</pre>
<p>This word will send a message to the given process and immediately
block until a reply is received for this particular message send. It
leaves the reply on the stack. Note that it doesn't wait for just any
reply, it waits for a reply specifically to this send.</p>
<p>To do this it wraps the requested message inside a 'tagged-message'
tuple. This tuple is defined as:</p>
<pre class="code">
TUPLE: tagged-message data from tag ;
</pre>
<p>When 'send-synchronous' is called it will created a
'tagged-message', storing the current process in the 'from' slot. This
is what the receiving server will use to send the reply to. It also
generates a random 'tag' which is stored in the 'tag' slot. The
receiving server will include this value in its reply. After the send
the current process will block waiting for a reply that has the exact
same tag. In this way you can be sure that the reply you got was for
the specific message sent.</p>
<p>Here is the 'pong server' recoded to use 'send-synchronous' and the
tagged-message type:</p>
<pre class="code">
GENERIC: handle-message2
PREDICATE: tagged-message ping-message2 ( obj -- ? )
tagged-message-data "ping" = ;
PREDICATE: tagged-message shutdown-message2 ( obj -- ? )
tagged-message-data "shutdown" = ;
M: ping-message2 handle-message2 ( message -- bool )
"pong" reply t ;
M: shutdown-message2 handle-message2 ( message -- bool )
"Pong server shutdown commenced" reply f ;
: (pong-server2) ( -- )
"pong-server2 waiting for message..." print
receive handle-message2 [ (pong-server2) ] when ;
: pong-server2 ( -- process )
[
(pong-server2)
"pong-server2 exiting..." print
] spawn ;
<span class="highlite">pong-server2</span>
=> pong-server2 waiting for message...
<span class="highlite">"ping" over send-synchronous .</span>
=> "pong"
pong-server2 waiting for message...
<span class="highlite">"ping" over send-synchronous .</span>
=> "pong"
pong-server2 waiting for message...
<span class="highlite">"shutdown" over send-synchronous .</span>
=> "Pong server shutdown commenced"
pong-server2 exiting...
</pre>
<p>The main difference in this example is that the 'handle-message2'
methods are dispatched over predicate types. Two predicate types are
set up both based on the 'tagged-message' tuple mentioned earlier. The
first is for 'ping-message2' which is a tagged message where the
message data is the string "ping". The second is also a tagged message
but the message data is the string "shutdown".</p>
<p>The implementation of the methods uses the 'reply' word. 'reply'
takes a received tagged-message and a new message on the stack and replies to
it. This means that it sends a reply back to the calling process using
the same 'tag'
as the original message. It is a convenience word so you don't have to
manually unpack the tagged-message tuple to get at the originating
process and tag. Its signature is:</p>
<pre class="code">
IN: concurrency
reply ( tagged-message message -- )
</pre>
<h2>Generic Server</h2>
<p>You'll probably have noticed that the general pattern of the pong
server examples are the same. In a loop they receive a message,
process it using a generic function, and either exit or go back to the
beginning of the loop. This is abstracted in the 'spawn-server'
word:</p>
<pre class="code">
IN: quotation
spawn-server ( quot -- process )
</pre>
<p>This takes a quotation that has stack effect ( message -- bool ).
'spawn-server' will spawn a server loop that waits for a message. When
it is received the quotation is called on it. If the quotation returns
false then the server process exits, otherwise it loops from the
beginning again. Using this word you can write the previous
'pong-server2' example as:</p>
<pre class="code">
GENERIC: handle-message2
PREDICATE: tagged-message ping-message2 ( obj -- ? ) tagged-message-data "ping" = ;
PREDICATE: tagged-message shutdown-message2 ( obj -- ? ) tagged-message-data "shutdown" = ;
M: ping-message2 handle-message2 ( message -- bool )
"pong" reply t ;
M: shutdown-message2 handle-message2 ( message -- bool )
"Pong server shutdown commenced" reply f ;
: pong-server3 ( -- process )
[ handle-message2 ] spawn-server ;
</pre>
<p>The main change is that you no longer need the helper
(pong-server2) word.</p>
<h2>Exceptions</h2>
<p>A process can handle exceptions using the standard Factor exception
handling mechanism. If an exception is uncaught the process will
terminate. For example:</p>
<pre class="code">
<span class="highlite">[
1 0 /
"This will not print" print
] spawn</span>
=>
Division by zero
:s :r show stacks at time of error.
:get ( var -- value ) inspects the error namestack.
</pre>
<p>Processes can be linked so that a parent process can receive the
exception that caused the child process to terminate. In this way
'supervisor' processes can be created that are notified when child
processes terminate and possibly restart them.</p>
<p>The easiest way to form this link is using the 'spawn-link'
word. This will create a unidirectional link, such that if an
uncaught exception causes the child to terminate, the parent process
can catch it:</p>
<pre class="code">
<span class="highlite">[
[
1 0 /
"This will not print" print
] spawn-link drop
receive
] [
[ "Exception caught." print ] when
] catch</span>
=> "Exception caught."
</pre>
<p>Exceptions are only raised in the parent when the parent does a
'receive' or 'receive-if'. This is because the exception is sent from
the child to the parent as a message.</p>
<p>To demonstrate how a 'supervisor' process could be created we'll
use the following example 'rpc-server'. It processes 'add', 'product'
and 'crash' messages. 'crash' causes a deliberate divide by zero error
to terminate the process:</p>
<pre class="code">
GENERIC: handle-rpc-message
GENERIC: run-rpc-command
TUPLE: rpc-command op args ;
PREDICATE: rpc-command add-command ( msg -- bool )
rpc-command-op "add" = ;
PREDICATE: rpc-command product-command ( msg -- bool )
rpc-command-op "product" = ;
PREDICATE: rpc-command shutdown-command ( msg -- bool )
rpc-command-op "shutdown" = ;
PREDICATE: rpc-command crash-command ( msg -- bool )
rpc-command-op "crash" = ;
M: tagged-message handle-rpc-message ( message -- bool )
dup tagged-message-data run-rpc-command -rot reply not ;
M: add-command run-rpc-command ( command -- shutdown? result )
rpc-command-args sum f ;
M: product-command run-rpc-command ( command -- shutdown? result )
rpc-command-args product f ;
M: shutdown-command run-rpc-command ( command -- shutdown? result )
drop t t ;
M: crash-command run-rpc-command ( command -- shutdown? result )
drop 1 0 / f ;
: fragile-rpc-server ( -- process )
[ handle-rpc-message ] spawn-server ;
: test-add ( process -- )
[
"add" [ 1 2 3 ] &lt;rpc-command> swap send-synchronous .
] cons spawn drop ;
: test-crash ( process -- )
[
"crash" f &lt;rpc-command> swap send-synchronous .
] cons spawn drop ;
</pre>
<p>An example of use:</p>
<pre class="code">
<span class="highlite">fragile-rpc-server</span>
=> Waiting for message in server: G:13037
<span class="highlite">dup test-add</span>
=> 6
Waiting for message in server: G:13037
<span class="highlite">dup test-crash</span>
=> Division by zero
:s :r show stacks at time of error.
:get ( var -- value ) inspects the error namestack.
<span class="highlite">dup test-add</span>
</pre>
<p>After the crash, all other messages are ignored by the server as it
is no longer running. The following is a way to re-use this code by
running a 'supervisor' process that links with the 'worker' rpc-server. When
the worker crashes the supervisor process restarts it. All
messages sent to the supervisor are immediately forwarded to the
worker:</p>
<pre class="code">
: (robust-rpc-server) ( worker -- )
[
#! Forward all messages to worker
receive over send
] [
[
"Worker died, Starting a new worker" print
drop [ handle-rpc-message ] spawn-linked-server
] when
] catch
(robust-rpc-server) ;
: robust-rpc-server ( -- process )
[
[ handle-rpc-message ] spawn-linked-server
(robust-rpc-server)
] spawn ;
</pre>
<p>This time when the 'robust-rpc-server' is run you'll notice that
messages after the crash are still processed:</p>
<pre class="code">
<span class="highlite">robust-rpc-server</span>
=> Waiting for message in server: G:13045
<span class="highlite">dup test-add</span>
=> 6
Waiting for message in server: G:13045
<span class="highlite">dup test-crash</span>
=> Worker died, Starting a new worker
Waiting for message in server: G:13050
<span class="highlite">dup test-add</span>
=> 6
Waiting for message in server: G:13050
</pre>
pong-server2
"ping" over send-message .
=> "pong"
"ping" over send-message .
=> "pong"
"shutdown" over send-message .
=> "Pong server shutdown commenced"
Exiting process: G:12364
</pre>
<p class="note">'send-reply' is not really a good name, and it isn't
that useful an interface. This is currently being worked on.</p>
<h2>Linked processes</h2>
<p>Write about how processes can be linked using 'spawn-link'. An
error thrown in the quotation will cause the process to die, and the
process that called 'spawn-link' to receive the exception when it next
attempts to receive from its message queue.</p>
<h2>Futures</h2> <h2>Futures</h2>
<p>Write about futures. Calling 'future' spawns a process to run a <p>A future is a placeholder for the result of a computation that is
quotation that returns a result. Using '?future' with that process on being calculated in a process. When the process has completed the
the stack will block the calling process until the result is computation the future can be queried to find out the result. If the
returned.</p> computation has not completed when the future is queried them the
process will block until the result is completed.</p>
<p>A future is created using the 'future' word:</p>
<pre class="code">
IN: concurrency
future ( quot -- future )
</pre>
<p>The quotation will be run in a spawned process, and a future object
is immediately returned. This future object can be resolved using the
word '?future':</p>
<pre class="code">
IN: concurrency
?future ( future -- result )
</pre>
<p>Futures are useful for starting calculations that take a long time
to run but aren't needed to later in the process. When the process
needs the value it can use '?future' to get the result or block until
the result is available. For example:</p>
<pre class="code">
[ 30 fib ] future
...do stuff...
?future
</pre>
<h2>Promises</h2>
<p>A promise is similar to a future but it is not produced by
calcuating something in the background. It represents a promise to
provide a value sometime later. A process can request the value of a
promise and will block if the promise is not fulfilled. Later, another
process can fulfill the promise, providing a value. All threads
waiting on the promise will then resume with that value on the
stack.</p>
<p>The words that operate on promises are:</p>
<pre class="code">
IN: concurrency
&lt;promise> ( -- promise )
fulfill ( value promise -- )
?promise ( promise -- result )
</pre>
<p>A simple example of use is:</p>
<pre class="code">
<span class="highlite">&lt;promise>
[ ?promise "Promise fulfilled: " write print ] spawn drop
[ ?promise "Promise fulfilled: " write print ] spawn drop
[ ?promise "Promise fulfilled: " write print ] spawn drop
"hello" swap fulfill</span>
=> Promise fulfilled: hello
Promise fulfilled: hello
Promise fulfilled: hello
</pre>
<p>In this example a promise is created and three processes spawned,
waiting for that promise to be fulfilled. The main process then
fulfills that promise with the value "hello" and all the blocking
processes resume, printing the value.</p>
<h2>GUI</h2>
<p>In the Alice programming system it's possible to display futures
and promises in the inspector and the values will automatically change
then the future is ready, or the promise fulfilled. It's possible to
do similar things with the Factor GUI but there is nothing currently
built-in. A simple example of how this might work is included in the
concurrency-examples vocabulary, with the 'test-promise-ui' word.</p>
<pre class="code">
: test-promise-ui ( -- )
&lt;promise> dup &lt;promised-label> gadget.
[ 12 fib unparse swap fulfill ] cons spawn drop ;
</pre>
<p>This creates a 'promised-label' gadget. This is a gadget, also
implemented in the examples, that has an attached promise. The gadget will display the text 'Unfulfilled
Promise' while the promise is unfulfilled. When it is fulfilled the
gadget will immediately redisplay the value of the promise (which will
need to be a printable value for this example).</p>
<p>The example above displays the gadget using 'gadget.' and then
spawns a thread to compute the 12th fibonacci number and fulfill the
promise with it converted to a string. As soon as the fulfill occurs
the gadget redisplays with the new value.</p>
<p>So running 'test-promise-ui' will displays 'Unfulfilled Promise'
and a short time later change to the new computed value. You will need
to have the Factor GUI listener for this to work:</p>
<pre class="code">
USE: shells
[ ui ] in-thread
</pre>
<p class="footer"> <p class="footer">
News and updates to this software can be obtained from the authors News and updates to this software can be obtained from the authors
weblog: <a href="http://radio.weblogs.com/0102385">Chris Double</a>.</p> weblog: <a href="http://radio.weblogs.com/0102385">Chris Double</a>.</p>

View File

@ -7,7 +7,8 @@ USE: io
USE: parser USE: parser
: a "concurrency.factor" run-file ; : a "../dlists.factor" run-file
"concurrency.factor" run-file ;
: b "concurrency-examples.factor" run-file ; : b "concurrency-examples.factor" run-file ;
: c "concurrency-tests.factor" run-file ; : c "concurrency-tests.factor" run-file ;
a a