Add parallel-cleave, parallel-spread, parallel-napply combinators

db4
Slava Pestov 2008-11-19 16:01:48 -06:00
parent f3911ea09a
commit cdf7436667
3 changed files with 49 additions and 11 deletions

View File

@ -27,11 +27,17 @@ HELP: parallel-filter
{ $errors "Throws an error if one of the iterations throws an error." } ;
ARTICLE: "concurrency.combinators" "Concurrent combinators"
"The " { $vocab-link "concurrency.combinators" } " vocabulary provides concurrent variants of " { $link each } ", " { $link map } " and " { $link filter } ":"
"The " { $vocab-link "concurrency.combinators" } " vocabulary provides concurrent variants of various combinators."
$nl
"Concurrent sequence combinators:"
{ $subsection parallel-each }
{ $subsection 2parallel-each }
{ $subsection parallel-map }
{ $subsection 2parallel-map }
{ $subsection parallel-filter } ;
{ $subsection parallel-filter }
"Concurrent cleave combinators:"
{ $subsection parallel-cleave }
{ $subsection parallel-spread }
{ $subsection parallel-napply } ;
ABOUT: "concurrency.combinators"

View File

@ -1,6 +1,7 @@
IN: concurrency.combinators.tests
USING: concurrency.combinators tools.test random kernel math
concurrency.mailboxes threads sequences accessors arrays ;
concurrency.mailboxes threads sequences accessors arrays
math.parser ;
[ [ drop ] parallel-each ] must-infer
{ 2 0 } [ [ 2drop ] 2parallel-each ] must-infer-as
@ -45,3 +46,10 @@ concurrency.mailboxes threads sequences accessors arrays ;
] unit-test
[ { f } [ "OOPS" throw ] parallel-each ] must-fail
[ "1a" "4b" "3c" ] [
2
{ [ 1- ] [ sq ] [ 1+ ] } parallel-cleave
[ number>string ] 3 parallel-napply
{ [ "a" append ] [ "b" append ] [ "c" append ] } parallel-spread
] unit-test

View File

@ -1,34 +1,58 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.futures concurrency.count-downs sequences
kernel ;
kernel macros fry combinators generalizations ;
IN: concurrency.combinators
<PRIVATE
: (parallel-each) ( n quot -- )
>r <count-down> r> keep await ; inline
[ <count-down> ] dip keep await ; inline
PRIVATE>
: parallel-each ( seq quot -- )
over length [
[ >r curry r> spawn-stage ] 2curry each
'[ _ curry _ spawn-stage ] each
] (parallel-each) ; inline
: 2parallel-each ( seq1 seq2 quot -- )
2over min-length [
[ >r 2curry r> spawn-stage ] 2curry 2each
'[ _ 2curry _ spawn-stage ] 2each
] (parallel-each) ; inline
: parallel-filter ( seq quot -- newseq )
over >r pusher >r each r> r> like ; inline
over [ pusher [ each ] dip ] dip like ; inline
<PRIVATE
: [future] ( quot -- quot' ) '[ _ curry future ] ; inline
: future-values dup [ ?future ] change-each ; inline
PRIVATE>
: parallel-map ( seq quot -- newseq )
[ curry future ] curry map future-values ;
inline
[future] map future-values ; inline
: 2parallel-map ( seq1 seq2 quot -- newseq )
[ 2curry future ] curry 2map future-values ;
'[ _ 2curry future ] 2map future-values ;
<PRIVATE
: (parallel-spread) ( n -- spread-array )
[ ?future ] <repetition> ; inline
: (parallel-cleave) ( quots -- quot-array spread-array )
[ [future] ] map dup length (parallel-spread) ; inline
PRIVATE>
MACRO: parallel-cleave ( quots -- )
(parallel-cleave) '[ _ cleave _ spread ] ;
MACRO: parallel-spread ( quots -- )
(parallel-cleave) '[ _ spread _ spread ] ;
MACRO: parallel-napply ( quot n -- )
[ [future] ] dip dup (parallel-spread) '[ _ _ napply _ spread ] ;