550 lines
21 KiB
HTML
550 lines
21 KiB
HTML
<html>
|
|
<head>
|
|
<title>Factor Concurrency Library</title>
|
|
<link rel="stylesheet" type="text/css" href="style.css">
|
|
</head>
|
|
<body>
|
|
<h1>Factor Concurrency Library</h1>
|
|
<p class="note">The concurrency library here is based upon the style
|
|
of concurrency used in systems like Erlang and Termite. It is
|
|
currently at a very early stage and only supports concurrent
|
|
processes within a single Factor image. The interface is very likely to
|
|
change so it is quite experimental at this stage. The ability to
|
|
have distributed processes is planned.</p>
|
|
<h1>Overview</h1>
|
|
<p>A concurrency oriented program is one in which multiple processes
|
|
run simultaneously in a single Factor image. The processes can
|
|
communicate with each other by asynchronous message sends. Although
|
|
processes can share data via Factor's mutable data structures it is
|
|
not recommended as the use of shared state concurrency is often a
|
|
cause of problems.</p>
|
|
<h1>Loading</h1>
|
|
<p>The quickest way to get up and running with this library is to
|
|
change to the 'concurrency' directory and run Factor. Then execute the
|
|
following commands:</p>
|
|
<pre class="code">
|
|
"load.factor" run-file
|
|
USE: concurrency
|
|
USE: concurrency-examples
|
|
</pre>
|
|
<h1>Processes</h1>
|
|
<p>A process is basically a thread with a message queue. Other
|
|
processes can place items on this queue by sending the process a
|
|
message. A process can check its queue for messages, blocking if none
|
|
are pending, and process them as they are queued.</p>
|
|
<p>Factor processes are very lightweight. Each process can take as
|
|
little as 900 bytes of memory. This library has been tested running
|
|
hundreds of thousands of simple processes.</p>
|
|
<p>The messages that are sent from process to process are any Factor
|
|
value. Factor tuples are ideal for this sort of thing as you can send
|
|
a tuple to a process and the predicate dispatch mechanism can be used
|
|
to perform actions depending on what the type of the tuple is.</p>
|
|
<p>Processes are usually created using the 'spawn' word:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
spawn ( quot -- process )
|
|
</pre>
|
|
<p>This word takes a quotation on the stack and starts a process that
|
|
will execute that quotation asynchronously. When the quotation
|
|
completes the process will die. 'spawn' leaves on the stack the
|
|
process object that was started. This object can be used to send
|
|
messages to the process using the 'send' word:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
send ( message process -- )
|
|
</pre>
|
|
<p>'send' will return immediately after placing the message in the
|
|
target processes message queue. A process can get a message from its
|
|
queue using the 'receive' word:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
receive ( -- message )
|
|
</pre>
|
|
<p>This will get the most recent message
|
|
and leave it on the stack. If there are no messages in the queue the
|
|
process will 'block' until a message is available. When a process is
|
|
blocked it takes no CPU time at all.</p>
|
|
<pre class="code">
|
|
[ receive print ] spawn
|
|
"Hello Process!" swap send
|
|
</pre>
|
|
<p>This example spawns a process that first blocks, waiting to receive
|
|
a message. When a message is received, the 'receive' call returns
|
|
leaving it on the stack. It then prints the message and exits. 'spawn'
|
|
left the process on the stack so it's available to send the 'Hello
|
|
Process!' message to it. Immediately after the 'send' you should see
|
|
'Hello Process!' printed on the console.</p>
|
|
<p>It is also possible to selectively retrieve messages from the
|
|
message queue. The 'receive-if' word takes a predicate quotation on the stack
|
|
and returns the first message in the queue that satisfies the
|
|
predicate. If no items satisfy the predicate then the process is
|
|
blocked until a message is received that does.
|
|
</p>
|
|
<pre class="code">
|
|
: odd? ( n -- ? )
|
|
2 mod 1 = ;
|
|
|
|
<span class="highlite">1 self send
|
|
2 self send
|
|
3 self send</span>
|
|
|
|
<span class="highlite">receive .</span>
|
|
=> 1
|
|
<span class="highlite">[ odd? ] receive-if .</span>
|
|
=> 3
|
|
<span class="highlite">receive .</span>
|
|
=> 2
|
|
</pre>
|
|
<h2>Self</h2>
|
|
<p>A process can get access to its own process object using the 'self'
|
|
word so it can pass it to other processes. This allows the other processes to send
|
|
messages back. A simple example of using this gets the current
|
|
processes 'self' and spawns a process which sends a message to it. We
|
|
then receive the message from the original process</p>
|
|
<pre class="code">
|
|
<span class="highlite">self .s</span>
|
|
=> << process ... >>
|
|
<span class="highlite">[ "Hello!" swap send ] cons spawn drop receive .</span>
|
|
=> "Hello"
|
|
</pre>
|
|
<h1>Servers</h1>
|
|
<p>A common idiom is to create 'server' processes that act on messages
|
|
that are sent to it. These follow a basic pattern of blocking until a
|
|
message is received, processing that message then looping back to
|
|
blocking for a message.</p>
|
|
<p>The following example shows a very simple server that expects a
|
|
cons cell as its message. The 'car' of the cons should be the senders
|
|
process object. If the 'cdr' is 'ping' then the server sends 'pong'
|
|
back to the caller. If the 'cdr' is anything else then the server
|
|
exits:</p>
|
|
<pre class="code">
|
|
: (pong-server0) ( -- )
|
|
receive uncons "ping" = [
|
|
"pong" swap send (pong-server0)
|
|
] [
|
|
"Pong server shutting down" swap send
|
|
] ifte ;
|
|
|
|
: pong-server0 ( -- process)
|
|
[ (pong-server0) ] spawn ;
|
|
|
|
<span class="highlite">pong-server0</span>
|
|
<span class="highlite">self "ping" cons over send receive .</span>
|
|
=> "pong"
|
|
<span class="highlite">self "ping" cons over send receive .</span>
|
|
=> "pong"
|
|
<span class="highlite">self "shutdown" cons over send receive .</span>
|
|
=> "Pong server shutting down"
|
|
</pre>
|
|
<p>Handling the deconstructing of messages and dispatching based on
|
|
the message can be a bit of a chore. Especially in servers that take a
|
|
number of different messages. One approach to factor this code out,
|
|
and reduce the amount of stack juggling required, is to use tuples as
|
|
messages. This allows using the generic dispatch mechanism. The
|
|
following example implements the pong server but using tuples as
|
|
messages:</p>
|
|
<pre class="code">
|
|
TUPLE: ping-message from ;
|
|
TUPLE: shutdown-message from ;
|
|
|
|
GENERIC: handle-message
|
|
|
|
M: ping-message handle-message ( message -- bool )
|
|
ping-message-from "pong" swap send t ;
|
|
|
|
M: shutdown-message handle-message ( message -- bool )
|
|
shutdown-message-from "Pong server shutdown commenced" swap send f ;
|
|
|
|
: (pong-server1) ( -- )
|
|
"pong-server1 waiting for message..." print
|
|
receive handle-message [ (pong-server1) ] when ;
|
|
|
|
: pong-server1 ( -- process )
|
|
[
|
|
(pong-server1)
|
|
"pong-server1 exiting..." print
|
|
] spawn ;
|
|
</pre>
|
|
<p>Two tuples are created for a 'ping' and 'shutdown' message. Each
|
|
has a 'from' slot which holds the process of the sender. The server
|
|
loop, in '(pong-server1)', calls a generic method called
|
|
'handle-message'. This has signature ( message -- bool ). These
|
|
methods return a boolean.
|
|
True means continue the server
|
|
loop. False means exit and shut down the server.</p>
|
|
<p>Two methods are added to the generic word. One for 'ping' and the
|
|
other for 'pong'. Here's a sample run:</p>
|
|
<pre class="code"> clear
|
|
<span class="highlite">pong-server1</span>
|
|
=> pong-server1 waiting for message...
|
|
<span class="highlite">self <ping-message> over send receive .</span>
|
|
=> "pong"
|
|
pong-server1 waiting for message...
|
|
<span class="highlite">self <ping-message> over send receive .</span>
|
|
=> "pong"
|
|
pong-server1 waiting for message...
|
|
<span class="highlite">self <shutdown-message> over send receive .</span>
|
|
=> "Pong server shutdown commenced"
|
|
pong-server1 exiting...
|
|
</pre>
|
|
<p>The advantage of this approach is it is easy to extend the server
|
|
without shutting it down. Adding a new message is as simple as
|
|
defining the tuple and adding a method to 'handle-message' specialised
|
|
on that tuple. Here's an example of adding an 'echo' message, without
|
|
shutting the server down:</p>
|
|
<pre class="code">
|
|
<span class="highlite">pong-server1</span>
|
|
=> pong-server1 waiting for message...
|
|
<span class="highlite">self <ping-message> over send receive .</span>
|
|
=> "pong"
|
|
|
|
TUPLE: echo-message from text ;
|
|
|
|
M: echo-message handle-message ( message -- bool )
|
|
dup echo-message-text swap echo-message-from send t ;
|
|
|
|
<span class="highlite">self "Hello World" <echo-message> over send receive .</span>
|
|
=>"Hello World"
|
|
|
|
</pre>
|
|
<h2>Synchronous Sends</h2>
|
|
<p>The 'send' word sends a message asynchronously, and the sending
|
|
process continues immediately. The 'pong server' examples shown
|
|
previously all sent messages to the server and waited for a reply back
|
|
from the server. This pattern of synchronous sending is made easier
|
|
with the 'send-synchronous' word:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
send-synchronous ( message process -- reply )
|
|
</pre>
|
|
<p>This word will send a message to the given process and immediately
|
|
block until a reply is received for this particular message send. It
|
|
leaves the reply on the stack. Note that it doesn't wait for just any
|
|
reply, it waits for a reply specifically to this send.</p>
|
|
<p>To do this it wraps the requested message inside a 'tagged-message'
|
|
tuple. This tuple is defined as:</p>
|
|
<pre class="code">
|
|
TUPLE: tagged-message data from tag ;
|
|
</pre>
|
|
<p>When 'send-synchronous' is called it will created a
|
|
'tagged-message', storing the current process in the 'from' slot. This
|
|
is what the receiving server will use to send the reply to. It also
|
|
generates a random 'tag' which is stored in the 'tag' slot. The
|
|
receiving server will include this value in its reply. After the send
|
|
the current process will block waiting for a reply that has the exact
|
|
same tag. In this way you can be sure that the reply you got was for
|
|
the specific message sent.</p>
|
|
<p>Here is the 'pong server' recoded to use 'send-synchronous' and the
|
|
tagged-message type:</p>
|
|
<pre class="code">
|
|
GENERIC: handle-message2
|
|
PREDICATE: tagged-message ping-message2 ( obj -- ? )
|
|
tagged-message-data "ping" = ;
|
|
PREDICATE: tagged-message shutdown-message2 ( obj -- ? )
|
|
tagged-message-data "shutdown" = ;
|
|
|
|
M: ping-message2 handle-message2 ( message -- bool )
|
|
"pong" reply t ;
|
|
|
|
M: shutdown-message2 handle-message2 ( message -- bool )
|
|
"Pong server shutdown commenced" reply f ;
|
|
|
|
: (pong-server2) ( -- )
|
|
"pong-server2 waiting for message..." print
|
|
receive handle-message2 [ (pong-server2) ] when ;
|
|
|
|
: pong-server2 ( -- process )
|
|
[
|
|
(pong-server2)
|
|
"pong-server2 exiting..." print
|
|
] spawn ;
|
|
|
|
<span class="highlite">pong-server2</span>
|
|
=> pong-server2 waiting for message...
|
|
<span class="highlite">"ping" over send-synchronous .</span>
|
|
=> "pong"
|
|
pong-server2 waiting for message...
|
|
<span class="highlite">"ping" over send-synchronous .</span>
|
|
=> "pong"
|
|
pong-server2 waiting for message...
|
|
<span class="highlite">"shutdown" over send-synchronous .</span>
|
|
=> "Pong server shutdown commenced"
|
|
pong-server2 exiting...
|
|
</pre>
|
|
<p>The main difference in this example is that the 'handle-message2'
|
|
methods are dispatched over predicate types. Two predicate types are
|
|
set up both based on the 'tagged-message' tuple mentioned earlier. The
|
|
first is for 'ping-message2' which is a tagged message where the
|
|
message data is the string "ping". The second is also a tagged message
|
|
but the message data is the string "shutdown".</p>
|
|
<p>The implementation of the methods uses the 'reply' word. 'reply'
|
|
takes a received tagged-message and a new message on the stack and replies to
|
|
it. This means that it sends a reply back to the calling process using
|
|
the same 'tag'
|
|
as the original message. It is a convenience word so you don't have to
|
|
manually unpack the tagged-message tuple to get at the originating
|
|
process and tag. Its signature is:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
reply ( tagged-message message -- )
|
|
</pre>
|
|
<h2>Generic Server</h2>
|
|
<p>You'll probably have noticed that the general pattern of the pong
|
|
server examples are the same. In a loop they receive a message,
|
|
process it using a generic function, and either exit or go back to the
|
|
beginning of the loop. This is abstracted in the 'spawn-server'
|
|
word:</p>
|
|
<pre class="code">
|
|
IN: quotation
|
|
spawn-server ( quot -- process )
|
|
</pre>
|
|
<p>This takes a quotation that has stack effect ( message -- bool ).
|
|
'spawn-server' will spawn a server loop that waits for a message. When
|
|
it is received the quotation is called on it. If the quotation returns
|
|
false then the server process exits, otherwise it loops from the
|
|
beginning again. Using this word you can write the previous
|
|
'pong-server2' example as:</p>
|
|
<pre class="code">
|
|
GENERIC: handle-message2
|
|
PREDICATE: tagged-message ping-message2 ( obj -- ? ) tagged-message-data "ping" = ;
|
|
PREDICATE: tagged-message shutdown-message2 ( obj -- ? ) tagged-message-data "shutdown" = ;
|
|
|
|
M: ping-message2 handle-message2 ( message -- bool )
|
|
"pong" reply t ;
|
|
|
|
M: shutdown-message2 handle-message2 ( message -- bool )
|
|
"Pong server shutdown commenced" reply f ;
|
|
|
|
: pong-server3 ( -- process )
|
|
[ handle-message2 ] spawn-server ;
|
|
</pre>
|
|
<p>The main change is that you no longer need the helper
|
|
(pong-server2) word.</p>
|
|
<h2>Exceptions</h2>
|
|
<p>A process can handle exceptions using the standard Factor exception
|
|
handling mechanism. If an exception is uncaught the process will
|
|
terminate. For example:</p>
|
|
<pre class="code">
|
|
<span class="highlite">[
|
|
1 0 /
|
|
"This will not print" print
|
|
] spawn</span>
|
|
=>
|
|
Division by zero
|
|
:s :r show stacks at time of error.
|
|
:get ( var -- value ) inspects the error namestack.
|
|
</pre>
|
|
<p>Processes can be linked so that a parent process can receive the
|
|
exception that caused the child process to terminate. In this way
|
|
'supervisor' processes can be created that are notified when child
|
|
processes terminate and possibly restart them.</p>
|
|
<p>The easiest way to form this link is using the 'spawn-link'
|
|
word. This will create a unidirectional link, such that if an
|
|
uncaught exception causes the child to terminate, the parent process
|
|
can catch it:</p>
|
|
<pre class="code">
|
|
<span class="highlite">[
|
|
[
|
|
1 0 /
|
|
"This will not print" print
|
|
] spawn-link drop
|
|
receive
|
|
]
|
|
catch [ "Exception caught." print ] when
|
|
</span>
|
|
=> "Exception caught."
|
|
</pre>
|
|
<p>Exceptions are only raised in the parent when the parent does a
|
|
'receive' or 'receive-if'. This is because the exception is sent from
|
|
the child to the parent as a message.</p>
|
|
<p>To demonstrate how a 'supervisor' process could be created we'll
|
|
use the following example 'rpc-server'. It processes 'add', 'product'
|
|
and 'crash' messages. 'crash' causes a deliberate divide by zero error
|
|
to terminate the process:</p>
|
|
<pre class="code">
|
|
GENERIC: handle-rpc-message
|
|
GENERIC: run-rpc-command
|
|
|
|
TUPLE: rpc-command op args ;
|
|
PREDICATE: rpc-command add-command ( msg -- bool )
|
|
rpc-command-op "add" = ;
|
|
PREDICATE: rpc-command product-command ( msg -- bool )
|
|
rpc-command-op "product" = ;
|
|
PREDICATE: rpc-command shutdown-command ( msg -- bool )
|
|
rpc-command-op "shutdown" = ;
|
|
PREDICATE: rpc-command crash-command ( msg -- bool )
|
|
rpc-command-op "crash" = ;
|
|
|
|
M: tagged-message handle-rpc-message ( message -- bool )
|
|
dup tagged-message-data run-rpc-command -rot reply not ;
|
|
|
|
M: add-command run-rpc-command ( command -- shutdown? result )
|
|
rpc-command-args sum f ;
|
|
|
|
M: product-command run-rpc-command ( command -- shutdown? result )
|
|
rpc-command-args product f ;
|
|
|
|
M: shutdown-command run-rpc-command ( command -- shutdown? result )
|
|
drop t t ;
|
|
|
|
M: crash-command run-rpc-command ( command -- shutdown? result )
|
|
drop 1 0 / f ;
|
|
|
|
: fragile-rpc-server ( -- process )
|
|
[ handle-rpc-message ] spawn-server ;
|
|
|
|
: test-add ( process -- )
|
|
[
|
|
"add" [ 1 2 3 ] <rpc-command> swap send-synchronous .
|
|
] cons spawn drop ;
|
|
|
|
: test-crash ( process -- )
|
|
[
|
|
"crash" f <rpc-command> swap send-synchronous .
|
|
] cons spawn drop ;
|
|
</pre>
|
|
<p>An example of use:</p>
|
|
<pre class="code">
|
|
<span class="highlite">fragile-rpc-server</span>
|
|
=> Waiting for message in server: G:13037
|
|
<span class="highlite">dup test-add</span>
|
|
=> 6
|
|
Waiting for message in server: G:13037
|
|
<span class="highlite">dup test-crash</span>
|
|
=> Division by zero
|
|
:s :r show stacks at time of error.
|
|
:get ( var -- value ) inspects the error namestack.
|
|
<span class="highlite">dup test-add</span>
|
|
</pre>
|
|
<p>After the crash, all other messages are ignored by the server as it
|
|
is no longer running. The following is a way to re-use this code by
|
|
running a 'supervisor' process that links with the 'worker' rpc-server. When
|
|
the worker crashes the supervisor process restarts it. All
|
|
messages sent to the supervisor are immediately forwarded to the
|
|
worker:</p>
|
|
<pre class="code">
|
|
: (robust-rpc-server) ( worker -- )
|
|
[
|
|
#! Forward all messages to worker
|
|
receive over send
|
|
]
|
|
catch
|
|
[
|
|
"Worker died, Starting a new worker" print
|
|
drop [ handle-rpc-message ] spawn-linked-server
|
|
] when
|
|
(robust-rpc-server) ;
|
|
|
|
: robust-rpc-server ( -- process )
|
|
[
|
|
[ handle-rpc-message ] spawn-linked-server
|
|
(robust-rpc-server)
|
|
] spawn ;
|
|
</pre>
|
|
<p>This time when the 'robust-rpc-server' is run you'll notice that
|
|
messages after the crash are still processed:</p>
|
|
<pre class="code">
|
|
<span class="highlite">robust-rpc-server</span>
|
|
=> Waiting for message in server: G:13045
|
|
<span class="highlite">dup test-add</span>
|
|
=> 6
|
|
Waiting for message in server: G:13045
|
|
<span class="highlite">dup test-crash</span>
|
|
=> Worker died, Starting a new worker
|
|
Waiting for message in server: G:13050
|
|
<span class="highlite">dup test-add</span>
|
|
=> 6
|
|
Waiting for message in server: G:13050
|
|
</pre>
|
|
|
|
<h2>Futures</h2>
|
|
<p>A future is a placeholder for the result of a computation that is
|
|
being calculated in a process. When the process has completed the
|
|
computation the future can be queried to find out the result. If the
|
|
computation has not completed when the future is queried them the
|
|
process will block until the result is completed.</p>
|
|
<p>A future is created using the 'future' word:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
future ( quot -- future )
|
|
</pre>
|
|
<p>The quotation will be run in a spawned process, and a future object
|
|
is immediately returned. This future object can be resolved using the
|
|
word '?future':</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
?future ( future -- result )
|
|
</pre>
|
|
<p>Futures are useful for starting calculations that take a long time
|
|
to run but aren't needed to later in the process. When the process
|
|
needs the value it can use '?future' to get the result or block until
|
|
the result is available. For example:</p>
|
|
<pre class="code">
|
|
[ 30 fib ] future
|
|
...do stuff...
|
|
?future
|
|
</pre>
|
|
<h2>Promises</h2>
|
|
<p>A promise is similar to a future but it is not produced by
|
|
calcuating something in the background. It represents a promise to
|
|
provide a value sometime later. A process can request the value of a
|
|
promise and will block if the promise is not fulfilled. Later, another
|
|
process can fulfill the promise, providing a value. All threads
|
|
waiting on the promise will then resume with that value on the
|
|
stack.</p>
|
|
<p>The words that operate on promises are:</p>
|
|
<pre class="code">
|
|
IN: concurrency
|
|
<promise> ( -- promise )
|
|
fulfill ( value promise -- )
|
|
?promise ( promise -- result )
|
|
</pre>
|
|
<p>A simple example of use is:</p>
|
|
<pre class="code">
|
|
<span class="highlite"><promise>
|
|
[ ?promise "Promise fulfilled: " write print ] spawn drop
|
|
[ ?promise "Promise fulfilled: " write print ] spawn drop
|
|
[ ?promise "Promise fulfilled: " write print ] spawn drop
|
|
"hello" swap fulfill</span>
|
|
=> Promise fulfilled: hello
|
|
Promise fulfilled: hello
|
|
Promise fulfilled: hello
|
|
</pre>
|
|
<p>In this example a promise is created and three processes spawned,
|
|
waiting for that promise to be fulfilled. The main process then
|
|
fulfills that promise with the value "hello" and all the blocking
|
|
processes resume, printing the value.</p>
|
|
<h2>GUI</h2>
|
|
<p>In the Alice programming system it's possible to display futures
|
|
and promises in the inspector and the values will automatically change
|
|
then the future is ready, or the promise fulfilled. It's possible to
|
|
do similar things with the Factor GUI but there is nothing currently
|
|
built-in. A simple example of how this might work is included in the
|
|
concurrency-examples vocabulary, with the 'test-promise-ui' word.</p>
|
|
<pre class="code">
|
|
: test-promise-ui ( -- )
|
|
<promise> dup <promised-label> gadget.
|
|
[ 12 fib unparse swap fulfill ] cons spawn drop ;
|
|
</pre>
|
|
<p>This creates a 'promised-label' gadget. This is a gadget, also
|
|
implemented in the examples, that has an attached promise. The gadget will display the text 'Unfulfilled
|
|
Promise' while the promise is unfulfilled. When it is fulfilled the
|
|
gadget will immediately redisplay the value of the promise (which will
|
|
need to be a printable value for this example).</p>
|
|
<p>The example above displays the gadget using 'gadget.' and then
|
|
spawns a thread to compute the 12th fibonacci number and fulfill the
|
|
promise with it converted to a string. As soon as the fulfill occurs
|
|
the gadget redisplays with the new value.</p>
|
|
<p>So running 'test-promise-ui' will displays 'Unfulfilled Promise'
|
|
and a short time later change to the new computed value. You will need
|
|
to have the Factor GUI listener for this to work:</p>
|
|
<pre class="code">
|
|
USE: shells
|
|
[ ui ] in-thread
|
|
</pre>
|
|
<p class="footer">
|
|
News and updates to this software can be obtained from the authors
|
|
weblog: <a href="http://radio.weblogs.com/0102385">Chris Double</a>.</p>
|
|
<p id="copyright">Copyright (c) 2004, Chris Double. All Rights Reserved.</p>
|
|
</body> </html>
|