diff --git a/extra/concurrency/concurrency.factor b/extra/concurrency/concurrency.factor index e4972c9030..b46439b583 100755 --- a/extra/concurrency/concurrency.factor +++ b/extra/concurrency/concurrency.factor @@ -264,12 +264,7 @@ PRIVATE> #! so the server continuation gets its new self updated. self swap call ; -TUPLE: future value processes ; - -: notify-future ( value future -- ) - tuck set-future-value - dup future-processes [ schedule-thread ] each - f swap set-future-processes ; +TUPLE: future status value processes ; : future ( quot -- future ) #! Spawn a process to call the quotation and immediately return @@ -277,22 +272,28 @@ TUPLE: future value processes ; #! ?future. If the quotation has completed the result will be returned. #! If not, the process will block until the quotation completes. #! 'quot' must have stack effect ( -- X ). - \ future construct-empty [ + [ [ - >r [ t 2array ] compose [ f 2array ] recover r> - notify-future - ] 2curry spawn drop - ] keep ; + t + ] compose + ] spawn drop + [ self send ] compose spawn ; : ?future ( future -- result ) #! Block the process until the future has completed and then #! place the result on the stack. Return the result #! immediately if the future has completed. - dup future-value [ - first2 [ throw ] unless - ] [ - dup [ future-processes push stop ] curry callcc0 ?future - ] ?if ; + process-mailbox mailbox-get ; + +: parallel-map ( seq quot -- newseq ) + #! Spawn a process to apply quot to each element of seq, + #! joining the results into a sequence at the end. + [ curry future ] curry map [ ?future ] map ; + +: parallel-each ( seq quot -- ) + #! Spawn a process to apply quot to each element of seq, + #! and waits for all processes to complete. + [ f ] compose parallel-map drop ; TUPLE: promise fulfilled? value processes ;