Linked error fixes, add 2parallel-each and 2parallel-map combinators

db4
Slava Pestov 2008-05-06 21:23:07 -05:00
parent 53832ccd2f
commit 5666cd78b9
6 changed files with 68 additions and 24 deletions

View File

@ -269,8 +269,7 @@ M: double-free summary
M: realloc-error summary
drop "Memory reallocation failed" ;
: error-in-thread. ( -- )
error-thread get-global
: error-in-thread. ( thread -- )
"Error in thread " write
[
dup thread-id #
@ -284,7 +283,7 @@ M: thread error-in-thread ( error thread -- )
die drop
] [
global [
error-in-thread. print-error flush
error-thread get-global error-in-thread. print-error flush
] bind
] if ;

View File

@ -6,11 +6,21 @@ HELP: parallel-map
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the results at the end." }
{ $errors "Throws an error if one of the iterations throws an error." } ;
HELP: 2parallel-map
{ $values { "seq1" sequence } { "seq2" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- newelt )" } } { "newseq" sequence } }
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to pairwise elements of " { $snippet "seq1" } " and " { $snippet "seq2" } ", collecting the results at the end." }
{ $errors "Throws an error if one of the iterations throws an error." } ;
HELP: parallel-each
{ $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- )" } } }
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", blocking until all quotations complete." }
{ $errors "Throws an error if one of the iterations throws an error." } ;
HELP: 2parallel-each
{ $values { "seq1" sequence } { "seq2" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- )" } } }
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to pairwise elements of " { $snippet "seq1" } " and " { $snippet "seq2" } ", blocking until all quotations complete." }
{ $errors "Throws an error if one of the iterations throws an error." } ;
HELP: parallel-filter
{ $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- ? )" } } { "newseq" sequence } }
{ $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the elements for which the quotation yielded a true value." }
@ -19,7 +29,9 @@ HELP: parallel-filter
ARTICLE: "concurrency.combinators" "Concurrent combinators"
"The " { $vocab-link "concurrency.combinators" } " vocabulary provides concurrent variants of " { $link each } ", " { $link map } " and " { $link filter } ":"
{ $subsection parallel-each }
{ $subsection 2parallel-each }
{ $subsection parallel-map }
{ $subsection 2parallel-map }
{ $subsection parallel-filter } ;
ABOUT: "concurrency.combinators"

View File

@ -1,9 +1,11 @@
IN: concurrency.combinators.tests
USING: concurrency.combinators tools.test random kernel math
concurrency.mailboxes threads sequences accessors ;
concurrency.mailboxes threads sequences accessors arrays ;
[ [ drop ] parallel-each ] must-infer
{ 2 0 } [ [ 2drop ] 2parallel-each ] must-infer-as
[ [ ] parallel-map ] must-infer
{ 2 1 } [ [ 2array ] 2parallel-map ] must-infer-as
[ [ ] parallel-filter ] must-infer
[ { 1 4 9 } ] [ { 1 2 3 } [ sq ] parallel-map ] unit-test
@ -22,3 +24,24 @@ concurrency.mailboxes threads sequences accessors ;
10 over [ push ] curry parallel-each
length
] unit-test
[ { 10 20 30 } ] [
{ 1 4 3 } { 10 5 10 } [ * ] 2parallel-map
] unit-test
[ { -9 -1 -7 } ] [
{ 1 4 3 } { 10 5 10 } [ - ] 2parallel-map
] unit-test
[
{ 1 4 3 } { 1 0 1 } [ / drop ] 2parallel-each
] must-fail
[ 20 ]
[
V{ } clone
10 10 pick [ [ push ] [ push ] bi ] curry 2parallel-each
length
] unit-test
[ { f } [ "OOPS" throw ] parallel-each ] must-fail

View File

@ -4,14 +4,27 @@ USING: concurrency.futures concurrency.count-downs sequences
kernel ;
IN: concurrency.combinators
: parallel-map ( seq quot -- newseq )
[ curry future ] curry map dup [ ?future ] change-each ;
inline
: (parallel-each) ( n quot -- )
>r <count-down> r> keep await ; inline
: parallel-each ( seq quot -- )
over length <count-down>
[ [ >r curry r> spawn-stage ] 2curry each ] keep await ;
inline
over length [
[ >r curry r> spawn-stage ] 2curry each
] (parallel-each) ; inline
: 2parallel-each ( seq1 seq2 quot -- )
2over min-length [
[ >r 2curry r> spawn-stage ] 2curry 2each
] (parallel-each) ; inline
: parallel-filter ( seq quot -- newseq )
over >r pusher >r each r> r> like ; inline
: future-values dup [ ?future ] change-each ; inline
: parallel-map ( seq quot -- newseq )
[ curry future ] curry map future-values ;
inline
: 2parallel-map ( seq1 seq2 quot -- newseq )
[ 2curry future ] curry 2map future-values ;

View File

@ -1,7 +1,7 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists kernel math concurrency.promises
concurrency.mailboxes ;
concurrency.mailboxes debugger accessors ;
IN: concurrency.count-downs
! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html
@ -9,9 +9,7 @@ IN: concurrency.count-downs
TUPLE: count-down n promise ;
: count-down-check ( count-down -- )
dup count-down-n zero? [
t swap count-down-promise fulfill
] [ drop ] if ;
dup n>> zero? [ t swap promise>> fulfill ] [ drop ] if ;
: <count-down> ( n -- count-down )
dup 0 < [ "Invalid count for count down" throw ] when
@ -19,15 +17,12 @@ TUPLE: count-down n promise ;
dup count-down-check ;
: count-down ( count-down -- )
dup count-down-n dup zero? [
"Count down already done" throw
] [
1- over set-count-down-n
count-down-check
] if ;
dup n>> dup zero?
[ "Count down already done" throw ]
[ 1- >>n count-down-check ] if ;
: await-timeout ( count-down timeout -- )
>r count-down-promise r> ?promise-timeout drop ;
>r promise>> r> ?promise-timeout ?linked t assert= ;
: await ( count-down -- )
f await-timeout ;
@ -35,5 +30,4 @@ TUPLE: count-down n promise ;
: spawn-stage ( quot count-down -- )
[ [ count-down ] curry compose ] keep
"Count down stage"
swap count-down-promise
promise-mailbox spawn-linked-to drop ;
swap promise>> mailbox>> spawn-linked-to drop ;

View File

@ -3,7 +3,7 @@
IN: concurrency.mailboxes
USING: dlists threads sequences continuations
namespaces random math quotations words kernel arrays assocs
init system concurrency.conditions accessors ;
init system concurrency.conditions accessors debugger ;
TUPLE: mailbox threads data closed ;
@ -83,6 +83,9 @@ M: mailbox dispose
TUPLE: linked-error error thread ;
M: linked-error error.
[ thread>> error-in-thread. ] [ error>> error. ] bi ;
C: <linked-error> linked-error
: ?linked dup linked-error? [ rethrow ] when ;