! Copyright (C) 2005 Chris Double. All Rights Reserved. ! See http://factorcode.org/license.txt for BSD license. ! ! Concurrency library for Factor based on Erlang/Termite style ! concurrency. USING: vectors dlists threads sequences continuations namespaces random math quotations words kernel match arrays io assocs init shuffle system ; IN: concurrency TUPLE: mailbox threads data ; TUPLE: thread timeout continuation continued? ; : ( timeout continuation -- obj ) >r dup [ millis + ] when r> { set-thread-timeout set-thread-continuation } thread construct ; : make-mailbox ( -- mailbox ) V{ } clone mailbox construct-boa ; : mailbox-empty? ( mailbox -- bool ) mailbox-data dlist-empty? ; : mailbox-put ( obj mailbox -- ) [ mailbox-data push-back ] keep [ mailbox-threads ] keep V{ } clone swap set-mailbox-threads [ thread-continuation schedule-thread ] each yield ; swap mailbox-threads push stop ] callcc0 (mailbox-block-unless-pred) ] if ; inline : (mailbox-block-if-empty) ( mailbox timeout -- mailbox2 ) over mailbox-empty? [ [ swap mailbox-threads push stop ] callcc0 (mailbox-block-if-empty) ] [ drop ] if ; PRIVATE> : mailbox-get* ( mailbox timeout -- obj ) (mailbox-block-if-empty) mailbox-data pop-front ; : mailbox-get ( mailbox -- obj ) f mailbox-get* ; : mailbox-get-all* ( mailbox timeout -- array ) (mailbox-block-if-empty) [ dup mailbox-empty? ] [ dup mailbox-data pop-front ] [ ] unfold nip ; : mailbox-get-all ( mailbox -- array ) f mailbox-get-all* ; : while-mailbox-empty ( mailbox quot -- ) over mailbox-empty? [ dup >r swap slip r> while-mailbox-empty ] [ 2drop ] if ; inline : mailbox-get?* ( pred mailbox timeout -- obj ) 2over >r >r (mailbox-block-unless-pred) r> r> mailbox-data delete-node-if ; inline : mailbox-get? ( pred mailbox -- obj ) f mailbox-get?* ; TUPLE: process links pid mailbox ; C: process GENERIC: send ( message process -- ) ; : make-linked-process ( process -- process ) #! Return a process set to run on the local node. That process is #! linked to the process on the stack. It will receive a message if #! that process terminates. 1quotation random-256 make-mailbox ; PRIVATE> : self ( -- process ) \ self get ; DEFER: register-process DEFER: unregister-process : spawn ( quot -- process ) [ ((spawn)) ] curry (spawn) ; inline TUPLE: linked-exception error ; C: linked-exception : while-no-messages ( quot -- ) #! Run the quotation in a loop while no messages are in #! the processes mailbox. The quot should have stack effect #! ( -- ). >r self process-mailbox r> while-mailbox-empty ; inline M: process send ( message process -- ) process-mailbox mailbox-put ; : receive ( -- message ) self process-mailbox mailbox-get dup linked-exception? [ linked-exception-error throw ] when ; : receive-if ( pred -- message ) self process-mailbox mailbox-get? dup linked-exception? [ linked-exception-error throw ] when ; inline : rethrow-linked ( error -- ) #! Rethrow the error to the linked process self process-links [ over swap send ] each drop ; : spawn-link ( quot -- process ) [ [ rethrow-linked ] recover ] curry [ ((spawn)) ] curry (spawn-link) ; inline : recv ( forms -- ) #! Get a message from the processes mailbox. Compare it against the #! forms to run a quotation if it matches the given message. 'forms' #! is a list of quotations in the following format: #! [ pred match-quot ] #! 'pred' is a word that has stack effect ( msg -- bool ). It is #! executed with the message on the stack. It should return a #! boolean if it is a message this form should process. #! 'match-quot' is a quotation with stack effect ( msg -- ). It #! will be called with the message on the top of the stack if #! the 'pred' word returned true. #! Each form in the list will be matched against the message, #! even if a prior match succeeded. This means multiple quotations #! may be run against the message. receive swap [ dupd (recv) ] each drop ; MATCH-VARS: ?from ?tag ; r self random-256 r> 3array ; PRIVATE> : send-synchronous ( message process -- reply ) #! Sends a message to the process synchronously. The #! message will be wrapped to include the process of the sender #! and a unique tag. After being sent the sending process will #! block for a reply tagged with the same unique tag. >r tag-message dup r> send second _ 2array [ match ] curry receive-if second ; : spawn-server ( quot -- process ) #! Spawn a server that receives messages, calling the #! quotation on the message. If the quotation returns false #! the spawned process exits. If it returns true, the process #! starts from the beginning again. The quotation should have #! stack effect ( message -- bool ). [ (spawn-server) "Exiting process: " write self process-pid print ] curry spawn ; inline : spawn-linked-server ( quot -- process ) #! Similar to 'spawn-server' but the parent process will be linked #! to the child. [ (spawn-server) "Exiting process: " write self process-pid print ] curry spawn-link ; inline : server-cc ( -- cc|process ) #! Captures the current continuation and returns the value. #! If that CC is called with a process on the stack it will #! set 'self' for the current process to it. Otherwise it will #! return the value. This allows capturing a continuation in a server, #! and jumping back into it from a spawn and keeping the 'self' #! variable correct. It's a workaround until I can find out how to #! stop 'self' from being clobbered back to its old value. [ ] callcc1 dup process? [ \ self set-global f ] when ; : call-server-cc ( server-cc -- ) #! Calls the server continuation passing the current 'self' #! so the server continuation gets its new self updated. self swap call ; TUPLE: future value processes ; : notify-future ( value future -- ) tuck set-future-value dup future-processes [ schedule-thread ] each f swap set-future-processes ; : future ( quot -- future ) #! Spawn a process to call the quotation and immediately return. f V{ } clone \ future construct-boa [ [ >r [ t 2array ] compose [ f 2array ] recover r> notify-future ] 2curry spawn drop ] keep ; : ?future ( future -- result ) #! Block the process until the future has completed and then #! place the result on the stack. Return the result #! immediately if the future has completed. dup future-value [ first2 [ throw ] unless ] [ dup [ future-processes push stop ] curry callcc0 ?future ] ?if ; : parallel-map ( seq quot -- newseq ) #! Spawn a process to apply quot to each element of seq, #! joining the results into a sequence at the end. [ curry future ] curry map [ ?future ] map ; : parallel-each ( seq quot -- ) #! Spawn a process to apply quot to each element of seq, #! and waits for all processes to complete. [ f ] compose parallel-map drop ; TUPLE: promise fulfilled? value processes ; : ( -- ) f f V{ } clone promise construct-boa ; : fulfill ( value promise -- ) #! Set the future of the promise to the given value. Threads #! blocking on the promise will then be released. dup promise-fulfilled? [ 2drop ] [ [ set-promise-value ] keep [ t swap set-promise-fulfilled? ] keep [ promise-processes ] keep V{ } clone swap set-promise-processes [ thread-continuation schedule-thread ] each yield ] if ; swap promise-processes push stop ] callcc0 drop ] if ; PRIVATE> : ?promise* ( promise timeout -- result ) (maybe-block-promise) promise-value ; : ?promise ( promise -- result ) f ?promise* ; ! ****************************** ! Experimental code below ! ****************************** : lazy ( quot -- lazy ) #! Spawn a process that immediately blocks and return it. #! When '?lazy' is called on the returned process, call the quotation #! and return the result. The quotation must have stack effect ( -- X ). [ receive { { { ?from ?tag _ } [ call ?tag over 2array ?from send (lazy) ] } } match-cond ] spawn nip ; : ?lazy ( lazy -- result ) #! Given a process spawned using 'lazy', evaluate it and return the result. f swap send-synchronous ; : register-process ( name process -- ) swap remote-processes set-at ; : unregister-process ( name -- ) remote-processes delete-at ; : get-process ( name -- process ) remote-processes at ; [ H{ } clone \ remote-processes set-global init-main-process self [ process-pid ] keep register-process ] "process-registry" add-init-hook