From 5666cd78b9970e1b553065d7e076699e5f65c31d Mon Sep 17 00:00:00 2001
From: Slava Pestov <slava@slava-pestovs-macbook-pro.local>
Date: Tue, 6 May 2008 21:23:07 -0500
Subject: [PATCH] Linked error fixes, add 2parallel-each and 2parallel-map
 combinators

---
 core/debugger/debugger.factor                 |  5 ++--
 .../combinators/combinators-docs.factor       | 12 +++++++++
 .../combinators/combinators-tests.factor      | 25 ++++++++++++++++++-
 .../combinators/combinators.factor            | 25 ++++++++++++++-----
 .../count-downs/count-downs.factor            | 20 ++++++---------
 extra/concurrency/mailboxes/mailboxes.factor  |  5 +++-
 6 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/core/debugger/debugger.factor b/core/debugger/debugger.factor
index e5dd02c25e..ee3352b719 100755
--- a/core/debugger/debugger.factor
+++ b/core/debugger/debugger.factor
@@ -269,8 +269,7 @@ M: double-free summary
 M: realloc-error summary
     drop "Memory reallocation failed" ;
 
-: error-in-thread. ( -- )
-    error-thread get-global
+: error-in-thread. ( thread -- )
     "Error in thread " write
     [
         dup thread-id #
@@ -284,7 +283,7 @@ M: thread error-in-thread ( error thread -- )
         die drop
     ] [
         global [
-            error-in-thread. print-error flush
+            error-thread get-global error-in-thread. print-error flush
         ] bind
     ] if ;
 
diff --git a/extra/concurrency/combinators/combinators-docs.factor b/extra/concurrency/combinators/combinators-docs.factor
index bbf8fb0f5f..a23301c1e2 100755
--- a/extra/concurrency/combinators/combinators-docs.factor
+++ b/extra/concurrency/combinators/combinators-docs.factor
@@ -6,11 +6,21 @@ HELP: parallel-map
 { $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the results at the end." }
 { $errors "Throws an error if one of the iterations throws an error." } ;
 
+HELP: 2parallel-map
+{ $values { "seq1" sequence } { "seq2" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- newelt )" } } { "newseq" sequence } }
+{ $description "Spawns a new thread for applying " { $snippet "quot" } " to pairwise elements of " { $snippet "seq1" } " and " { $snippet "seq2" } ", collecting the results at the end." }
+{ $errors "Throws an error if one of the iterations throws an error." } ;
+
 HELP: parallel-each
 { $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- )" } } }
 { $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", blocking until all quotations complete." }
 { $errors "Throws an error if one of the iterations throws an error." } ;
 
+HELP: 2parallel-each
+{ $values { "seq1" sequence } { "seq2" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- )" } } }
+{ $description "Spawns a new thread for applying " { $snippet "quot" } " to pairwise elements of " { $snippet "seq1" } " and " { $snippet "seq2" } ", blocking until all quotations complete." }
+{ $errors "Throws an error if one of the iterations throws an error." } ;
+
 HELP: parallel-filter
 { $values { "seq" sequence } { "quot" "a quotation with stack effect " { $snippet "( elt -- ? )" } } { "newseq" sequence } }
 { $description "Spawns a new thread for applying " { $snippet "quot" } " to every element of " { $snippet "seq" } ", collecting the elements for which the quotation yielded a true value." }
@@ -19,7 +29,9 @@ HELP: parallel-filter
 ARTICLE: "concurrency.combinators" "Concurrent combinators"
 "The " { $vocab-link "concurrency.combinators" } " vocabulary provides concurrent variants of " { $link each } ", " { $link map } " and " { $link filter } ":"
 { $subsection parallel-each }
+{ $subsection 2parallel-each }
 { $subsection parallel-map }
+{ $subsection 2parallel-map }
 { $subsection parallel-filter } ;
 
 ABOUT: "concurrency.combinators"
diff --git a/extra/concurrency/combinators/combinators-tests.factor b/extra/concurrency/combinators/combinators-tests.factor
index 3381cba5e8..562111242d 100755
--- a/extra/concurrency/combinators/combinators-tests.factor
+++ b/extra/concurrency/combinators/combinators-tests.factor
@@ -1,9 +1,11 @@
 IN: concurrency.combinators.tests
 USING: concurrency.combinators tools.test random kernel math 
-concurrency.mailboxes threads sequences accessors ;
+concurrency.mailboxes threads sequences accessors arrays ;
 
 [ [ drop ] parallel-each ] must-infer
+{ 2 0 } [ [ 2drop ] 2parallel-each ] must-infer-as
 [ [ ] parallel-map ] must-infer
+{ 2 1 } [ [ 2array ] 2parallel-map ] must-infer-as
 [ [ ] parallel-filter ] must-infer
 
 [ { 1 4 9 } ] [ { 1 2 3 } [ sq ] parallel-map ] unit-test
@@ -22,3 +24,24 @@ concurrency.mailboxes threads sequences accessors ;
     10 over [ push ] curry parallel-each
     length
 ] unit-test
+
+[ { 10 20 30 } ] [
+    { 1 4 3 } { 10 5 10 } [ * ] 2parallel-map
+] unit-test
+
+[ { -9 -1 -7 } ] [
+    { 1 4 3 } { 10 5 10 } [ - ] 2parallel-map
+] unit-test
+
+[
+    { 1 4 3 } { 1 0 1 } [ / drop ] 2parallel-each
+] must-fail
+
+[ 20 ]
+[
+    V{ } clone
+    10 10 pick [ [ push ] [ push ] bi ] curry 2parallel-each
+    length
+] unit-test
+
+[ { f } [ "OOPS" throw ] parallel-each ] must-fail
diff --git a/extra/concurrency/combinators/combinators.factor b/extra/concurrency/combinators/combinators.factor
index 3c4101e381..eab0ed4cb4 100755
--- a/extra/concurrency/combinators/combinators.factor
+++ b/extra/concurrency/combinators/combinators.factor
@@ -4,14 +4,27 @@ USING: concurrency.futures concurrency.count-downs sequences
 kernel ;
 IN: concurrency.combinators
 
-: parallel-map ( seq quot -- newseq )
-    [ curry future ] curry map dup [ ?future ] change-each ;
-    inline
+: (parallel-each) ( n quot -- )
+    >r <count-down> r> keep await ; inline
 
 : parallel-each ( seq quot -- )
-    over length <count-down>
-    [ [ >r curry r> spawn-stage ] 2curry each ] keep await ;
-    inline
+    over length [
+        [ >r curry r> spawn-stage ] 2curry each
+    ] (parallel-each) ; inline
+
+: 2parallel-each ( seq1 seq2 quot -- )
+    2over min-length [
+        [ >r 2curry r> spawn-stage ] 2curry 2each
+    ] (parallel-each) ; inline
 
 : parallel-filter ( seq quot -- newseq )
     over >r pusher >r each r> r> like ; inline
+
+: future-values dup [ ?future ] change-each ; inline
+
+: parallel-map ( seq quot -- newseq )
+    [ curry future ] curry map future-values ;
+    inline
+
+: 2parallel-map ( seq1 seq2 quot -- newseq )
+    [ 2curry future ] curry 2map future-values ;
diff --git a/extra/concurrency/count-downs/count-downs.factor b/extra/concurrency/count-downs/count-downs.factor
index 6a75f7206c..93cef250a1 100755
--- a/extra/concurrency/count-downs/count-downs.factor
+++ b/extra/concurrency/count-downs/count-downs.factor
@@ -1,7 +1,7 @@
 ! Copyright (C) 2008 Slava Pestov.
 ! See http://factorcode.org/license.txt for BSD license.
 USING: dlists kernel math concurrency.promises
-concurrency.mailboxes ;
+concurrency.mailboxes debugger accessors ;
 IN: concurrency.count-downs
 
 ! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html
@@ -9,9 +9,7 @@ IN: concurrency.count-downs
 TUPLE: count-down n promise ;
 
 : count-down-check ( count-down -- )
-    dup count-down-n zero? [
-        t swap count-down-promise fulfill
-    ] [ drop ] if ;
+    dup n>> zero? [ t swap promise>> fulfill ] [ drop ] if ;
 
 : <count-down> ( n -- count-down )
     dup 0 < [ "Invalid count for count down" throw ] when
@@ -19,15 +17,12 @@ TUPLE: count-down n promise ;
     dup count-down-check ;
 
 : count-down ( count-down -- )
-    dup count-down-n dup zero? [
-        "Count down already done" throw
-    ] [
-        1- over set-count-down-n
-        count-down-check
-    ] if ;
+    dup n>> dup zero?
+    [ "Count down already done" throw ]
+    [ 1- >>n count-down-check ] if ;
 
 : await-timeout ( count-down timeout -- )
-    >r count-down-promise r> ?promise-timeout drop ;
+    >r promise>> r> ?promise-timeout ?linked t assert= ;
 
 : await ( count-down -- )
     f await-timeout ;
@@ -35,5 +30,4 @@ TUPLE: count-down n promise ;
 : spawn-stage ( quot count-down -- )
     [ [ count-down ] curry compose ] keep
     "Count down stage"
-    swap count-down-promise
-    promise-mailbox spawn-linked-to drop ;
+    swap promise>> mailbox>> spawn-linked-to drop ;
diff --git a/extra/concurrency/mailboxes/mailboxes.factor b/extra/concurrency/mailboxes/mailboxes.factor
index ac03197708..aa4dc2df3d 100755
--- a/extra/concurrency/mailboxes/mailboxes.factor
+++ b/extra/concurrency/mailboxes/mailboxes.factor
@@ -3,7 +3,7 @@
 IN: concurrency.mailboxes
 USING: dlists threads sequences continuations
 namespaces random math quotations words kernel arrays assocs
-init system concurrency.conditions accessors ;
+init system concurrency.conditions accessors debugger ;
 
 TUPLE: mailbox threads data closed ;
 
@@ -83,6 +83,9 @@ M: mailbox dispose
 
 TUPLE: linked-error error thread ;
 
+M: linked-error error.
+    [ thread>> error-in-thread. ] [ error>> error. ] bi ;
+
 C: <linked-error> linked-error
 
 : ?linked dup linked-error? [ rethrow ] when ;