From 2c6afdfee603d308996a6f826e614c8ef44f7b1a Mon Sep 17 00:00:00 2001 From: Slava Pestov Date: Mon, 18 Feb 2008 05:07:40 -0600 Subject: [PATCH] Working on new threads --- core/alien/compiler/compiler.factor | 6 +- .../remote-control/remote-control.factor | 4 +- core/bootstrap/image/image.factor | 2 +- core/bootstrap/stage1.factor | 1 + core/compiler/compiler.factor | 4 +- core/{ => concurrency}/threads/authors.txt | 0 core/{ => concurrency}/threads/summary.txt | 0 core/concurrency/threads/threads-docs.factor | 137 +++++++ core/concurrency/threads/threads-tests.factor | 16 + core/concurrency/threads/threads.factor | 172 ++++++++ core/continuations/continuations.factor | 7 +- core/generator/generator.factor | 2 +- core/init/init.factor | 0 core/io/backend/backend.factor | 6 +- core/io/files/files-tests.factor | 4 +- core/io/thread/thread.factor | 14 + core/namespaces/namespaces-docs.factor | 7 +- core/threads/threads-docs.factor | 69 ---- core/threads/threads-tests.factor | 12 - core/threads/threads.factor | 71 ---- extra/bootstrap/tools/tools.factor | 3 +- extra/calendar/model/model.factor | 5 +- extra/concurrency/concurrency.factor | 384 ------------------ .../distributed/distributed.factor | 52 +-- extra/concurrency/exchangers/authors.txt | 1 + .../concurrency/exchangers/exchangers.factor | 21 + extra/concurrency/exchangers/exchangers.txt | 1 + extra/concurrency/{ => futures}/authors.txt | 1 + extra/concurrency/futures/futures.factor | 25 ++ extra/concurrency/futures/summary.txt | 1 + extra/concurrency/locks/authors.txt | 1 + extra/concurrency/locks/locks.factor | 87 ++++ extra/concurrency/locks/summary.txt | 1 + extra/concurrency/messaging/authors.txt | 2 + .../messaging-docs.factor} | 0 .../messaging-tests.factor} | 0 extra/concurrency/messaging/messaging.factor | 121 ++++++ extra/concurrency/messaging/summary.txt | 1 + extra/concurrency/promises/authors.txt | 2 + extra/concurrency/promises/promises.factor | 24 ++ extra/concurrency/promises/summary.txt | 1 + extra/concurrency/semaphores/authors.txt | 1 + .../concurrency/semaphores/semaphores.factor | 20 + extra/concurrency/semaphores/summary.txt | 1 + extra/concurrency/summary.txt | 1 - extra/concurrency/tags.txt | 1 - extra/io/launcher/launcher.factor | 6 +- extra/io/monitors/monitors.factor | 6 +- extra/io/sockets/sockets-docs.factor | 2 +- extra/io/timeouts/timeouts.factor | 9 +- extra/io/unix/backend/backend.factor | 8 +- extra/io/unix/launcher/launcher.factor | 2 +- extra/io/unix/unix-tests.factor | 4 +- extra/io/windows/launcher/launcher.factor | 5 +- extra/io/windows/nt/backend/backend.factor | 15 +- extra/io/windows/nt/files/files.factor | 6 +- extra/io/windows/nt/nt.factor | 2 + extra/io/windows/nt/sockets/sockets.factor | 2 +- extra/tools/interpreter/interpreter.factor | 3 +- extra/tools/threads/threads.factor | 22 + extra/ui/tools/interactor/interactor.factor | 26 +- extra/ui/tools/listener/listener.factor | 5 +- extra/ui/tools/walker/walker.factor | 9 +- extra/ui/ui.factor | 8 +- extra/ui/windows/windows.factor | 8 +- extra/vocabs/monitor/monitor.factor | 20 +- extra/webapps/planet/planet.factor | 4 +- vm/run.h | 6 +- 68 files changed, 804 insertions(+), 666 deletions(-) mode change 100644 => 100755 core/alien/remote-control/remote-control.factor rename core/{ => concurrency}/threads/authors.txt (100%) rename core/{ => concurrency}/threads/summary.txt (100%) create mode 100755 core/concurrency/threads/threads-docs.factor create mode 100755 core/concurrency/threads/threads-tests.factor create mode 100755 core/concurrency/threads/threads.factor mode change 100644 => 100755 core/init/init.factor create mode 100755 core/io/thread/thread.factor mode change 100644 => 100755 core/namespaces/namespaces-docs.factor delete mode 100755 core/threads/threads-docs.factor delete mode 100755 core/threads/threads-tests.factor delete mode 100755 core/threads/threads.factor delete mode 100755 extra/concurrency/concurrency.factor create mode 100644 extra/concurrency/exchangers/authors.txt create mode 100755 extra/concurrency/exchangers/exchangers.factor create mode 100644 extra/concurrency/exchangers/exchangers.txt rename extra/concurrency/{ => futures}/authors.txt (50%) create mode 100755 extra/concurrency/futures/futures.factor create mode 100644 extra/concurrency/futures/summary.txt create mode 100644 extra/concurrency/locks/authors.txt create mode 100755 extra/concurrency/locks/locks.factor create mode 100644 extra/concurrency/locks/summary.txt create mode 100644 extra/concurrency/messaging/authors.txt rename extra/concurrency/{concurrency-docs.factor => messaging/messaging-docs.factor} (100%) rename extra/concurrency/{concurrency-tests.factor => messaging/messaging-tests.factor} (100%) create mode 100755 extra/concurrency/messaging/messaging.factor create mode 100644 extra/concurrency/messaging/summary.txt create mode 100644 extra/concurrency/promises/authors.txt create mode 100755 extra/concurrency/promises/promises.factor create mode 100644 extra/concurrency/promises/summary.txt create mode 100644 extra/concurrency/semaphores/authors.txt create mode 100755 extra/concurrency/semaphores/semaphores.factor create mode 100644 extra/concurrency/semaphores/summary.txt delete mode 100644 extra/concurrency/summary.txt delete mode 100644 extra/concurrency/tags.txt create mode 100755 extra/tools/threads/threads.factor diff --git a/core/alien/compiler/compiler.factor b/core/alien/compiler/compiler.factor index 3a41b80c2a..24408e1e20 100755 --- a/core/alien/compiler/compiler.factor +++ b/core/alien/compiler/compiler.factor @@ -1,12 +1,12 @@ -! Copyright (C) 2006, 2007 Slava Pestov. +! Copyright (C) 2006, 2008 Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. USING: arrays generator generator.registers generator.fixup hashtables kernel math namespaces sequences words inference.state inference.backend inference.dataflow system math.parser classes alien.arrays alien.c-types alien.structs alien.syntax cpu.architecture alien inspector quotations assocs -kernel.private threads continuations.private libc combinators -compiler.errors continuations ; +kernel.private concurrency.threads continuations.private libc +combinators compiler.errors continuations ; IN: alien.compiler ! Common protocol for alien-invoke/alien-callback/alien-indirect diff --git a/core/alien/remote-control/remote-control.factor b/core/alien/remote-control/remote-control.factor old mode 100644 new mode 100755 index b7700c0ff1..f3c84119bf --- a/core/alien/remote-control/remote-control.factor +++ b/core/alien/remote-control/remote-control.factor @@ -1,7 +1,7 @@ ! Copyright (C) 2007 Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. -USING: alien alien.c-types parser threads words kernel.private -kernel ; +USING: alien alien.c-types parser concurrency.threads words +kernel.private kernel ; IN: alien.remote-control : eval-callback diff --git a/core/bootstrap/image/image.factor b/core/bootstrap/image/image.factor index 17b56458ce..35dae109cf 100755 --- a/core/bootstrap/image/image.factor +++ b/core/bootstrap/image/image.factor @@ -36,7 +36,7 @@ IN: bootstrap.image : data-base 1024 ; inline -: userenv-size 40 ; inline +: userenv-size 64 ; inline : header-size 10 ; inline diff --git a/core/bootstrap/stage1.factor b/core/bootstrap/stage1.factor index 4f5bf6d69e..7c7a03f575 100755 --- a/core/bootstrap/stage1.factor +++ b/core/bootstrap/stage1.factor @@ -31,6 +31,7 @@ vocabs.loader system ; "libc" require "io.streams.c" require + "io.thread" require "vocabs.loader" require "syntax" require diff --git a/core/compiler/compiler.factor b/core/compiler/compiler.factor index f0caec7ad1..3f06f85d10 100755 --- a/core/compiler/compiler.factor +++ b/core/compiler/compiler.factor @@ -3,8 +3,8 @@ USING: kernel namespaces arrays sequences io inference.backend inference.state generator debugger math.parser prettyprint words compiler.units continuations vocabs assocs alien.compiler dlists -optimizer definitions math compiler.errors threads graphs -generic ; +optimizer definitions math compiler.errors concurrency.threads +graphs generic ; IN: compiler : compiled-usages ( words -- seq ) diff --git a/core/threads/authors.txt b/core/concurrency/threads/authors.txt similarity index 100% rename from core/threads/authors.txt rename to core/concurrency/threads/authors.txt diff --git a/core/threads/summary.txt b/core/concurrency/threads/summary.txt similarity index 100% rename from core/threads/summary.txt rename to core/concurrency/threads/summary.txt diff --git a/core/concurrency/threads/threads-docs.factor b/core/concurrency/threads/threads-docs.factor new file mode 100755 index 0000000000..53acb40794 --- /dev/null +++ b/core/concurrency/threads/threads-docs.factor @@ -0,0 +1,137 @@ +USING: help.markup help.syntax kernel kernel.private io +concurrency.threads.private continuations dlists init +quotations strings assocs heaps ; +IN: concurrency.threads + +ARTICLE: "threads-start/stop" "Starting and stopping threads" +"Spawning new threads:" +{ $subsection spawn } +"Creating and spawning a thread can be factored out into two separate steps:" +{ $subsection } +{ $subsection (spawn) } +"Threads stop either when the quotation given to " { $link spawn } " returns, or when the following word is called:" +{ $subsection stop } +"If the image is saved and started again, all runnable threads are stopped. Vocabularies wishing to have a background thread always running should use " { $link add-init-hook } "." ; + +ARTICLE: "threads-yield" "Yielding and suspending threads" +"Yielding to other threads:" +{ $subsection yield } +{ $subsection sleep } +"Threads can be suspended and woken up at some point in the future when a condition is satisfied:" +{ $subsection suspend } +{ $subsection resume } +{ $subsection resume-with } ; + +ARTICLE: "thread-state" "Thread-local state" +"Threads form a class of objects:" +{ $subsection thread } +"The current thread:" +{ $subsection self } +"Thread-local variables:" +{ $subsection tnamespace } +{ $subsection tget } +{ $subsection tset } +{ $subsection tchange } +"Global hashtable of all threads, keyed by " { $link thread-id } ":" +{ $subsection threads } +"Threads have an identity independent of continuations. If a continuation is refied in one thread and then resumed in another thread, the code running in that continuation will observe a change in the value output by " { $link self } "." ; + +ARTICLE: "thread-impl" "Thread implementation" +"Thread implementation:" +{ $subsection run-queue } +{ $subsection sleep-queue } ; + +ARTICLE: "threads" "Lightweight co-operative threads" +"Factor supports lightweight co-operative threads implemented on top of continuations. A thread will yield while waiting for I/O operations to complete, or when a yield has been explicitly requested." +$nl +"Words for working with threads are in the " { $vocab-link "concurrency.threads" } " vocabulary." +{ $subsection "threads-start/stop" } +{ $subsection "threads-yield" } +{ $subsection "thread-state" } +{ $subsection "thread-impl" } ; + +ABOUT: "threads" + +HELP: thread +{ $class-description "A thread. The slots are as follows:" + { $list + { { $link thread-id } " - a unique identifier assigned to each thread." } + { { $link thread-name } " - the name passed to " { $link spawn } "." } + { { $link thread-quot } " - the initial quotation passed to " { $link spawn } "." } + { { $link thread-continuation } " - if the thread is waiting to run, the saved thread context. If the thread is currently running, will be " { $link f } "." } + { { $link thread-registered? } " - a boolean indicating whether the thread is eligible to run or not. Spawning a thread with " { $link (spawn) } " sets this flag and " { $link stop } " clears it." } + } +} ; + +HELP: self +{ $values { "thread" thread } } +{ $description "Pushes the currently-running thread." } ; + +HELP: +{ $values { "quot" quotation } { "name" string } { "error-handler" quotation } } +{ $description "Low-level thread constructor. The thread runs the quotation when spawned; the name is simply used to identify the thread for debugging purposes. The error handler is called if the thread's quotation throws an unhandled error; it should either print the error or notify another thread." } +{ $notes "In most cases, user code should call " { $link spawn } " instead, however for control over the error handler quotation, threads can be created with " { $link } " then passed to " { $link (spawn) } "." } ; + +HELP: run-queue +{ $values { "queue" dlist } } +{ $var-description "Global variable holding the queue of runnable threads. Calls to " { $link yield } " switch to the thread which has been in the queue for the longest period of time." +$nl +"By convention, threads are queued with " { $link push-front } +" and dequeued with " { $link pop-back } "." } ; + +HELP: resume +{ $values { "thread" thread } } +{ $description "Adds a thread to the end of the run queue. The thread must have previously been suspended by a call to " { $link suspend } "." } ; + +HELP: resume-with +{ $values { "obj" object } { "thread" thread } } +{ $description "Adds a thread to the end of the run queue together with an object to pass to the thread. The thread must have previously been suspended by a call to " { $link suspend } "; the object is returned from the " { $link suspend } " call." } ; + +HELP: sleep-queue +{ $var-description "A " { $link min-heap } " storing the queue of sleeping threads." } ; + +HELP: sleep-time +{ $values { "ms" "a non-negative integer or " { $link f } } } +{ $description "Outputs the time until the next sleeping thread is scheduled to wake up, which could be zero if there are threads in the run queue, or threads which need to wake up right now. If there are no runnable or sleeping threads, outputs " { $link f } "." } ; + +HELP: stop +{ $description "Stops the current thread. The thread may be started again from another thread using " { $link (spawn) } "." } ; + +HELP: yield +{ $description "Adds the current thread to the end of the run queue, and switches to the next runnable thread." } ; + +HELP: sleep +{ $values { "ms" "a non-negative integer" } } +{ $description "Suspends the current thread for " { $snippet "ms" } " milliseconds. It will not get woken up before this time period elapses, but since the multitasker is co-operative, the precise wakeup time is dependent on when other threads yield." } ; + +HELP: suspend +{ $values { "quot" "a quotation with stack effect " { $snippet "( thread -- )" } } { "obj" object } } +{ $description "Suspends the current thread and passes it to the quotation. After the quotation returns, control yields to the next runnable thread and the current thread does not execute again until it is resumed, and so the quotation must arrange for another thread to later resume the suspended thread with a call to " { $link resume } " or " { $link resume-with } "." } ; + +HELP: spawn +{ $values { "quot" quotation } { "name" string } } +{ $description "Spawns a new thread. The thread begins executing the given quotation; the name is for debugging purposes. The new thread begins running immediately and the current thread is added to the end of the run queue." +$nl +"The new thread begins with an empty data stack, an empty catch stack and a name stack containing the global namespace only. This means that the only way to pass data to the new thread is to explicitly construct a quotation containing the data, for example using " { $link curry } " or " { $link compose } "." } +{ $examples + { $code "1 2 [ + . ] 2curry \"Addition thread\" spawn" } +} ; + +HELP: init-threads +{ $description "Called during startup to initialize the threading system. This word should never be called directly." } ; + +HELP: tnamespace +{ $values { "assoc" assoc } } +{ $description "Outputs the current thread's set of thread-local variables." } ; + +HELP: tget +{ $values { "key" object } { "value" object } } +{ $description "Outputs the value of a thread-local variable." } ; + +HELP: tset +{ $values { "value" object } { "key" object } } +{ $description "Sets the value of a thread-local variable." } ; + +HELP: tchange +{ $values { "key" object } { "quot" "a quotation with stack effect " { $snippet "( value -- newvalue )" } } } +{ $description "Applies the quotation to the current value of a thread-local variable, storing the result back to the same variable." } ; diff --git a/core/concurrency/threads/threads-tests.factor b/core/concurrency/threads/threads-tests.factor new file mode 100755 index 0000000000..2bd7e8aa4c --- /dev/null +++ b/core/concurrency/threads/threads-tests.factor @@ -0,0 +1,16 @@ +USING: namespaces io tools.test concurrency.threads kernel ; +IN: temporary + +3 "x" set +namespace [ [ yield 2 "x" set ] bind ] curry "Test" spawn drop +[ 2 ] [ yield "x" get ] unit-test +[ ] [ [ flush ] "flush test" spawn drop flush ] unit-test +[ ] [ [ "Errors, errors" throw ] "error test" spawn drop ] unit-test +yield + +[ ] [ 0.3 sleep ] unit-test +[ "hey" sleep ] must-fail + +[ 3 ] [ + [ 3 swap resume-with ] suspend +] unit-test diff --git a/core/concurrency/threads/threads.factor b/core/concurrency/threads/threads.factor new file mode 100755 index 0000000000..1a11a00b82 --- /dev/null +++ b/core/concurrency/threads/threads.factor @@ -0,0 +1,172 @@ +! Copyright (C) 2004, 2008 Slava Pestov. +! Copyright (C) 2005 Mackenzie Straight. +! See http://factorcode.org/license.txt for BSD license. +IN: concurrency.threads +USING: arrays hashtables heaps kernel kernel.private math +namespaces sequences vectors continuations continuations.private +dlists assocs system combinators debugger prettyprint io init ; + +SYMBOL: initial-thread + +TUPLE: thread +name quot error-handler +id registered? +continuation +mailbox variables ; + +: self ( -- thread ) 40 getenv ; inline + +! Thread-local storage +: tnamespace ( -- assoc ) self thread-variables ; + +: tget ( key -- value ) tnamespace at ; + +: tset ( value key -- ) tnamespace set-at ; + +: tchange ( key quot -- ) tnamespace change-at ; inline + +SYMBOL: threads + +threads global [ H{ } assoc-like ] change-at + +: thread ( id -- thread ) threads get-global at ; + + ( quot name error-handler -- thread ) + \ thread counter H{ } clone { + set-thread-quot + set-thread-name + set-thread-error-handler + set-thread-id + set-thread-variables + } \ thread construct ; + +PRIVATE> + +SYMBOL: run-queue +SYMBOL: sleep-queue + +: resume ( thread -- ) + check-registered run-queue get-global push-front ; + +: resume-with ( obj thread -- ) + check-registered 2array run-queue get-global push-front ; + +r check-registered r> sleep-queue get-global heap-push ; + +: wake-up? ( heap -- ? ) + dup heap-empty? + [ drop f ] [ heap-peek nip millis <= ] if ; + +: wake-up ( -- ) + sleep-queue get-global + [ dup wake-up? ] [ dup heap-pop drop resume ] [ ] while + drop ; + +: next ( -- ) + walker-hook [ + continue + ] [ + wake-up + run-queue get-global pop-back + dup array? [ first2 ] [ f swap ] if dup set-self + dup thread-continuation + f rot set-thread-continuation + continue-with + ] if* ; + +PRIVATE> + +: sleep-time ( -- ms ) + { + { [ run-queue get-global dlist-empty? not ] [ 0 ] } + { [ sleep-queue get-global heap-empty? ] [ f ] } + { [ t ] [ sleep-queue get-global heap-peek nip millis [-] ] } + } cond ; + +: stop ( -- ) + self unregister-thread next ; + +: suspend ( quot -- obj ) + [ + >r self [ set-thread-continuation ] keep r> call next + ] curry callcc1 ; inline + +: yield ( -- ) [ resume ] suspend drop ; + +: sleep ( ms -- ) + >fixnum millis + [ schedule-sleep ] curry suspend drop ; + +: (spawn) ( thread -- ) + [ + resume [ + dup set-self + dup register-thread + init-namespaces + V{ } set-catchstack + { } set-retainstack + >r { } set-datastack r> + thread-quot [ call stop ] call-clear + ] 1 (throw) + ] suspend 2drop ; + +: spawn ( quot name -- thread ) + [ + global [ + "Error in thread " write + dup thread-id pprint + " (" write + dup thread-name pprint ")" print + "spawned to call " write + thread-quot short. + nl + print-error flush + ] bind + ] + [ (spawn) ] keep ; + +: in-thread ( quot -- ) "Thread" spawn drop ; + + run-queue set-global + sleep-queue set-global + H{ } clone threads set-global + initial-thread global + [ drop f "Initial" [ die ] ] cache + f over set-thread-continuation + f over set-thread-registered? + dup register-thread + set-self ; + +[ self dup thread-error-handler call stop ] +thread-error-hook set-global + +PRIVATE> + +[ init-threads ] "concurrency.threads" add-init-hook diff --git a/core/continuations/continuations.factor b/core/continuations/continuations.factor index 81f78f491d..19802da7df 100755 --- a/core/continuations/continuations.factor +++ b/core/continuations/continuations.factor @@ -113,8 +113,13 @@ GENERIC: compute-restarts ( error -- seq ) PRIVATE> +SYMBOL: thread-error-hook + : rethrow ( error -- * ) - catchstack* empty? [ die ] when + catchstack* empty? [ + thread-error-hook get-global + [ 1 (throw) ] [ die ] if* + ] when dup save-error c> continue-with ; : recover ( try recovery -- ) diff --git a/core/generator/generator.factor b/core/generator/generator.factor index 3514947e3d..c62fc9f8a2 100755 --- a/core/generator/generator.factor +++ b/core/generator/generator.factor @@ -5,7 +5,7 @@ effects generator.fixup generator.registers generic hashtables inference inference.backend inference.dataflow io kernel kernel.private layouts math namespaces optimizer optimizer.specializers prettyprint quotations sequences system -threads words vectors ; +concurrency.threads words vectors ; IN: generator SYMBOL: compile-queue diff --git a/core/init/init.factor b/core/init/init.factor old mode 100644 new mode 100755 diff --git a/core/io/backend/backend.factor b/core/io/backend/backend.factor index 9aa1299871..c38b7355b1 100755 --- a/core/io/backend/backend.factor +++ b/core/io/backend/backend.factor @@ -19,8 +19,8 @@ HOOK: normalize-pathname io-backend ( str -- newstr ) M: object normalize-pathname ; -[ init-io embedded? [ init-stdio ] unless ] -"io.backend" add-init-hook - : set-io-backend ( backend -- ) io-backend set-global init-io init-stdio ; + +[ init-io embedded? [ init-stdio ] unless ] +"io.backend" add-init-hook diff --git a/core/io/files/files-tests.factor b/core/io/files/files-tests.factor index d0f9737f19..a111070151 100755 --- a/core/io/files/files-tests.factor +++ b/core/io/files/files-tests.factor @@ -55,11 +55,11 @@ USING: tools.test io.files io threads kernel continuations ; [ f ] [ "test-blah" resource-path exists? ] unit-test -[ ] [ "test-quux.txt" resource-path [ [ yield "Hi" write ] in-thread ] with-file-writer ] unit-test +[ ] [ "test-quux.txt" resource-path [ [ yield "Hi" write ] "Test" spawn drop ] with-file-writer ] unit-test [ ] [ "test-quux.txt" resource-path delete-file ] unit-test -[ ] [ "test-quux.txt" resource-path [ [ yield "Hi" write ] in-thread ] with-file-writer ] unit-test +[ ] [ "test-quux.txt" resource-path [ [ yield "Hi" write ] "Test" spawn drop ] with-file-writer ] unit-test [ ] [ "test-quux.txt" "quux-test.txt" [ resource-path ] 2apply rename-file ] unit-test [ t ] [ "quux-test.txt" resource-path exists? ] unit-test diff --git a/core/io/thread/thread.factor b/core/io/thread/thread.factor new file mode 100755 index 0000000000..ec118dcbf7 --- /dev/null +++ b/core/io/thread/thread.factor @@ -0,0 +1,14 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +IN: io.thread +USING: concurrency.threads io.backend namespaces init ; + +: io-thread ( -- ) + sleep-time io-multiplex yield io-thread ; + +: start-io-thread ( -- ) + [ io-thread ] + "I/O wait" spawn + \ io-thread set-global ; + +[ start-io-thread ] "io.thread" add-init-hook diff --git a/core/namespaces/namespaces-docs.factor b/core/namespaces/namespaces-docs.factor old mode 100644 new mode 100755 index f087090f2c..2d4b9a03b2 --- a/core/namespaces/namespaces-docs.factor +++ b/core/namespaces/namespaces-docs.factor @@ -179,8 +179,5 @@ HELP: % { $description "Appends a sequence to the end of the sequence being constructed by " { $link make } "." } ; HELP: init-namespaces -{ $description "Resets the name stack to its initial state, holding a single copy of the global namespace. This word is called during startup and is rarely useful, except in certain situations such as the example below." } -{ $examples - "You can use this word to spawn a new thread which does not inherit the parent thread's dynamic variable bindings:" - { $code "[ init-namestack do-some-work ] in-thread" } -} ; +{ $description "Resets the name stack to its initial state, holding a single copy of the global namespace." } +$low-level-note ; diff --git a/core/threads/threads-docs.factor b/core/threads/threads-docs.factor deleted file mode 100755 index ece90d9a11..0000000000 --- a/core/threads/threads-docs.factor +++ /dev/null @@ -1,69 +0,0 @@ -USING: help.markup help.syntax kernel kernel.private io -threads.private continuations dlists ; -IN: threads - -ARTICLE: "threads" "Threads" -"A limited form of multiprocessing is supported in the form of cooperative threads, which are implemented on top of continuations. A thread will yield while waiting for I/O operations to complete, or when a yield has been explicitly requested." -$nl -"Words for working with threads are in the " { $vocab-link "threads" } " vocabulary." -{ $subsection in-thread } -{ $subsection yield } -{ $subsection sleep } -"Threads stop either when the quotation given to " { $link in-thread } " returns, or when the following word is called:" -{ $subsection stop } -"Continuations can be added to the run queue directly:" -{ $subsection schedule-thread } -{ $subsection schedule-thread-with } -"Thread implementation:" -{ $subsection run-queue } -{ $subsection sleep-queue } ; - -ABOUT: "threads" - -HELP: run-queue -{ $values { "queue" dlist } } -{ $description "Outputs the runnable thread queue. By convention, continuations are queued with " { $link push-front } -" and dequeued with " { $link pop-back } "." } ; - -HELP: schedule-thread -{ $values { "continuation" "a continuation reified by " { $link callcc0 } } } -{ $description "Adds a runnable thread to the end of the run queue." } ; - -HELP: schedule-thread-with -{ $values { "obj" "an object" } { "continuation" "a continuation reified by " { $link callcc1 } } } -{ $description "Adds a runnable thread to the end of the run queue. When the thread runs the object is passed to the continuation using " { $link continue-with } "." } ; - -HELP: sleep-queue -{ $var-description "Sleeping thread queue. This is not actually a queue, but an array of pairs of the shape " { $snippet "{ time continuation }" } "." } ; - -HELP: sleep-time -{ $values { "ms" "a non-negative integer" } } -{ $description "Outputs the time until the next sleeping thread is scheduled to wake up, or a default sleep time if there are no sleeping threads." } ; - -HELP: stop -{ $description "Stops the current thread." } ; - -HELP: yield -{ $description "Adds the current thread to the end of the run queue, and switches to the next runnable thread." } ; - -HELP: sleep -{ $values { "ms" "a non-negative integer" } } -{ $description "Suspends the current thread for " { $snippet "ms" } " milliseconds. It will not get woken up before this time period elapses, but since the multitasker is co-operative, the precise wakeup time is dependent on when other threads yield." } ; - -HELP: in-thread -{ $values { "quot" "a quotation" } } -{ $description "Spawns a new thread. The new thread begins running immediately." -$nl -"The new thread inherits the current data stack and name stack. The call stack initially contains the new quotation only, so when the quotation returns the thread stops. The catch stack contains a default handler which logs errors to the " { $link stdio } " stream." } -{ $examples - { $code "1 2 [ + . ] in-thread" } -} ; - -HELP: idle-thread -{ $description "Runs the idle thread, which services I/O requests and relinquishes control to the operating system until the next Factor thread has to wake up again." -$nl -"If the run queue is empty, the idle thread will sleep until the next sleeping thread is scheduled to wake up, otherwise it yields immediately after checking for any completed I/O requests." } -{ $notes "This word should never be called directly. The idle thread is always running." } ; - -HELP: init-threads -{ $description "Called during startup to initialize the threading system. This word should never be called directly." } ; diff --git a/core/threads/threads-tests.factor b/core/threads/threads-tests.factor deleted file mode 100755 index 379b10ce88..0000000000 --- a/core/threads/threads-tests.factor +++ /dev/null @@ -1,12 +0,0 @@ -USING: namespaces io tools.test threads kernel ; -IN: temporary - -3 "x" set -[ yield 2 "x" set ] in-thread -[ 2 ] [ yield "x" get ] unit-test -[ ] [ [ flush ] in-thread flush ] unit-test -[ ] [ [ "Errors, errors" throw ] in-thread ] unit-test -yield - -[ ] [ 0.3 sleep ] unit-test -[ "hey" sleep ] must-fail diff --git a/core/threads/threads.factor b/core/threads/threads.factor deleted file mode 100755 index c4e159742a..0000000000 --- a/core/threads/threads.factor +++ /dev/null @@ -1,71 +0,0 @@ -! Copyright (C) 2004, 2007 Slava Pestov. -! Copyright (C) 2005 Mackenzie Straight. -! See http://factorcode.org/license.txt for BSD license. -IN: threads -USING: arrays init hashtables heaps io.backend kernel -kernel.private math namespaces sequences vectors io system -continuations debugger dlists ; - - - -: schedule-thread ( continuation -- ) - run-queue push-front ; - -: schedule-thread-with ( obj continuation -- ) - 2array schedule-thread ; - -: stop ( -- ) - walker-hook [ - continue - ] [ - run-queue pop-back dup array? - [ first2 continue-with ] [ continue ] if - ] if* ; - -: yield ( -- ) [ schedule-thread stop ] callcc0 ; - -: sleep ( ms -- ) - >fixnum millis + [ schedule-sleep stop ] curry callcc0 ; - -: in-thread ( quot -- ) - [ - >r schedule-thread r> [ - V{ } set-catchstack - { } set-retainstack - [ [ print-error ] recover stop ] call-clear - ] 1 (throw) - ] curry callcc0 ; - - \ run-queue set-global - sleep-queue set-global - [ idle-thread ] in-thread ; - -[ init-threads ] "threads" add-init-hook -PRIVATE> diff --git a/extra/bootstrap/tools/tools.factor b/extra/bootstrap/tools/tools.factor index 40d77e03be..718f73308c 100755 --- a/extra/bootstrap/tools/tools.factor +++ b/extra/bootstrap/tools/tools.factor @@ -5,10 +5,11 @@ USING: vocabs.loader sequences ; "tools.annotations" "tools.crossref" "tools.deploy" + "tools.disassembler" "tools.memory" "tools.profiler" "tools.test" "tools.time" - "tools.disassembler" + "tools.threads" "editors" } [ require ] each diff --git a/extra/calendar/model/model.factor b/extra/calendar/model/model.factor index 855b0cd815..61ab191b75 100755 --- a/extra/calendar/model/model.factor +++ b/extra/calendar/model/model.factor @@ -1,6 +1,6 @@ ! Copyright (C) 2008 Slava Pestov ! See http://factorcode.org/license.txt for BSD license. -USING: calendar namespaces models threads init ; +USING: calendar namespaces models concurrency.threads init ; IN: calendar.model SYMBOL: time @@ -9,7 +9,8 @@ SYMBOL: time now time get set-model 1000 sleep (time-thread) ; -: time-thread ( -- ) [ (time-thread) ] in-thread ; +: time-thread ( -- ) + [ (time-thread) ] "Time model update" spawn drop ; f time set-global [ time-thread ] "calendar.model" add-init-hook diff --git a/extra/concurrency/concurrency.factor b/extra/concurrency/concurrency.factor deleted file mode 100755 index b0abac8f5b..0000000000 --- a/extra/concurrency/concurrency.factor +++ /dev/null @@ -1,384 +0,0 @@ -! Copyright (C) 2005 Chris Double. All Rights Reserved. -! See http://factorcode.org/license.txt for BSD license. -! -! Concurrency library for Factor based on Erlang/Termite style -! concurrency. -USING: vectors dlists threads sequences continuations - namespaces random math quotations words kernel match - arrays io assocs init shuffle system ; -IN: concurrency - -TUPLE: mailbox threads data ; - -TUPLE: thread timeout continuation continued? ; - -: ( timeout continuation -- obj ) - >r dup [ millis + ] when r> - { - set-thread-timeout - set-thread-continuation - } thread construct ; - -: make-mailbox ( -- mailbox ) - V{ } clone mailbox construct-boa ; - -: mailbox-empty? ( mailbox -- bool ) - mailbox-data dlist-empty? ; - -: mailbox-put ( obj mailbox -- ) - [ mailbox-data push-back ] keep - [ mailbox-threads ] keep - V{ } clone swap set-mailbox-threads - [ thread-continuation schedule-thread ] each yield ; - - swap mailbox-threads push stop ] callcc0 - (mailbox-block-unless-pred) - ] if ; inline - -: (mailbox-block-if-empty) ( mailbox timeout -- mailbox2 ) - over mailbox-empty? [ - [ swap mailbox-threads push stop ] callcc0 - (mailbox-block-if-empty) - ] [ - drop - ] if ; -PRIVATE> -: mailbox-get* ( mailbox timeout -- obj ) - (mailbox-block-if-empty) - mailbox-data pop-front ; - -: mailbox-get ( mailbox -- obj ) - f mailbox-get* ; - -: mailbox-get-all* ( mailbox timeout -- array ) - (mailbox-block-if-empty) - [ dup mailbox-empty? ] - [ dup mailbox-data pop-front ] - [ ] unfold nip ; - -: mailbox-get-all ( mailbox -- array ) - f mailbox-get-all* ; - -: while-mailbox-empty ( mailbox quot -- ) - over mailbox-empty? [ - dup >r swap slip r> while-mailbox-empty - ] [ - 2drop - ] if ; inline - -: mailbox-get?* ( pred mailbox timeout -- obj ) - 2over >r >r (mailbox-block-unless-pred) r> r> - mailbox-data delete-node-if ; inline - -: mailbox-get? ( pred mailbox -- obj ) - f mailbox-get?* ; - -TUPLE: process links pid mailbox ; - -C: process - -GENERIC: send ( message process -- ) - - ; - -: make-linked-process ( process -- process ) - #! Return a process set to run on the local node. That process is - #! linked to the process on the stack. It will receive a message if - #! that process terminates. - 1quotation random-256 make-mailbox ; -PRIVATE> - -: self ( -- process ) - \ self get ; - - - -DEFER: register-process -DEFER: unregister-process - - - -: spawn ( quot -- process ) - [ ((spawn)) ] curry (spawn) ; inline - -TUPLE: linked-exception error ; - -C: linked-exception - -: while-no-messages ( quot -- ) - #! Run the quotation in a loop while no messages are in - #! the processes mailbox. The quot should have stack effect - #! ( -- ). - >r self process-mailbox r> while-mailbox-empty ; inline - -M: process send ( message process -- ) - process-mailbox mailbox-put ; - -: receive ( -- message ) - self process-mailbox mailbox-get dup linked-exception? [ - linked-exception-error rethrow - ] when ; - -: receive-if ( pred -- message ) - self process-mailbox mailbox-get? dup linked-exception? [ - linked-exception-error rethrow - ] when ; inline - -: rethrow-linked ( error -- ) - #! Rethrow the error to the linked process - self process-links [ - over swap send - ] each drop ; - - - -: spawn-link ( quot -- process ) - [ [ rethrow-linked ] recover ] curry - [ ((spawn)) ] curry (spawn-link) ; inline - - - -: recv ( forms -- ) - #! Get a message from the processes mailbox. Compare it against the - #! forms to run a quotation if it matches the given message. 'forms' - #! is a list of quotations in the following format: - #! [ pred match-quot ] - #! 'pred' is a word that has stack effect ( msg -- bool ). It is - #! executed with the message on the stack. It should return a - #! boolean if it is a message this form should process. - #! 'match-quot' is a quotation with stack effect ( msg -- ). It - #! will be called with the message on the top of the stack if - #! the 'pred' word returned true. - #! Each form in the list will be matched against the message, - #! even if a prior match succeeded. This means multiple quotations - #! may be run against the message. - receive swap [ dupd (recv) ] each drop ; - -MATCH-VARS: ?from ?tag ; - -r self random-256 r> 3array ; -PRIVATE> - -: send-synchronous ( message process -- reply ) - #! Sends a message to the process synchronously. The - #! message will be wrapped to include the process of the sender - #! and a unique tag. After being sent the sending process will - #! block for a reply tagged with the same unique tag. - >r tag-message dup r> send second _ 2array [ match ] curry - receive-if second ; - - - -: spawn-server ( quot -- process ) - #! Spawn a server that receives messages, calling the - #! quotation on the message. If the quotation returns false - #! the spawned process exits. If it returns true, the process - #! starts from the beginning again. The quotation should have - #! stack effect ( message -- bool ). - [ - (spawn-server) - "Exiting process: " write self process-pid print - ] curry spawn ; inline - -: spawn-linked-server ( quot -- process ) - #! Similar to 'spawn-server' but the parent process will be linked - #! to the child. - [ - (spawn-server) - "Exiting process: " write self process-pid print - ] curry spawn-link ; inline - -: server-cc ( -- cc|process ) - #! Captures the current continuation and returns the value. - #! If that CC is called with a process on the stack it will - #! set 'self' for the current process to it. Otherwise it will - #! return the value. This allows capturing a continuation in a server, - #! and jumping back into it from a spawn and keeping the 'self' - #! variable correct. It's a workaround until I can find out how to - #! stop 'self' from being clobbered back to its old value. - [ ] callcc1 dup process? [ \ self set-global f ] when ; - -: call-server-cc ( server-cc -- ) - #! Calls the server continuation passing the current 'self' - #! so the server continuation gets its new self updated. - self swap call ; - -TUPLE: future value processes ; - -: notify-future ( value future -- ) - tuck set-future-value - dup future-processes [ schedule-thread ] each - f swap set-future-processes ; - -: future ( quot -- future ) - #! Spawn a process to call the quotation and immediately return. - f V{ } clone \ future construct-boa [ - [ - >r [ t 2array ] compose [ f 2array ] recover r> - notify-future - ] 2curry spawn drop - ] keep ; - -: ?future ( future -- result ) - #! Block the process until the future has completed and then - #! place the result on the stack. Return the result - #! immediately if the future has completed. - dup future-value [ - first2 [ rethrow ] unless - ] [ - dup [ future-processes push stop ] curry callcc0 ?future - ] ?if ; - -: parallel-map ( seq quot -- newseq ) - #! Spawn a process to apply quot to each element of seq, - #! joining the results into a sequence at the end. - [ curry future ] curry map [ ?future ] map ; - -: parallel-each ( seq quot -- ) - #! Spawn a process to apply quot to each element of seq, - #! and waits for all processes to complete. - [ f ] compose parallel-map drop ; - -TUPLE: promise fulfilled? value processes ; - -: ( -- ) - f f V{ } clone promise construct-boa ; - -: fulfill ( value promise -- ) - #! Set the future of the promise to the given value. Threads - #! blocking on the promise will then be released. - dup promise-fulfilled? [ - 2drop - ] [ - [ set-promise-value ] keep - [ t swap set-promise-fulfilled? ] keep - [ promise-processes ] keep - V{ } clone swap set-promise-processes - [ thread-continuation schedule-thread ] each yield - ] if ; - - swap promise-processes push stop ] callcc0 - drop - ] if ; -PRIVATE> - -: ?promise* ( promise timeout -- result ) - (maybe-block-promise) promise-value ; - -: ?promise ( promise -- result ) - f ?promise* ; - -! ****************************** -! Experimental code below -! ****************************** - - -: lazy ( quot -- lazy ) - #! Spawn a process that immediately blocks and return it. - #! When '?lazy' is called on the returned process, call the quotation - #! and return the result. The quotation must have stack effect ( -- X ). - [ - receive { - { { ?from ?tag _ } - [ call ?tag over 2array ?from send (lazy) ] } - } match-cond - ] spawn nip ; - -: ?lazy ( lazy -- result ) - #! Given a process spawned using 'lazy', evaluate it and return the result. - f swap send-synchronous ; - - - -: register-process ( name process -- ) - swap remote-processes set-at ; - -: unregister-process ( name -- ) - remote-processes delete-at ; - -: get-process ( name -- process ) - remote-processes at ; - -[ - H{ } clone \ remote-processes set-global - init-main-process - self [ process-pid ] keep register-process -] "process-registry" add-init-hook diff --git a/extra/concurrency/distributed/distributed.factor b/extra/concurrency/distributed/distributed.factor index 83052b803a..042c33306e 100755 --- a/extra/concurrency/distributed/distributed.factor +++ b/extra/concurrency/distributed/distributed.factor @@ -1,43 +1,33 @@ ! Copyright (C) 2005 Chris Double. All Rights Reserved. ! See http://factorcode.org/license.txt for BSD license. -USING: serialize sequences concurrency io io.server qualified -threads arrays namespaces kernel ; +USING: serialize sequences concurrency.messaging +concurrency.threads io io.server qualified arrays +namespaces kernel ; QUALIFIED: io.sockets IN: concurrency.distributed -TUPLE: node hostname port ; - -C: node - : handle-node-client ( -- ) - deserialize first2 get-process send ; + deserialize first2 thread send ; -: node-server ( port -- ) - internet-server - "concurrency.distributed" - [ handle-node-client ] with-server ; +: (start-node) ( addrspecs addrspec -- ) + [ + local-node set-global + "concurrency.distributed" + [ handle-node-client ] with-server + ] 2curry f spawn drop ; -: send-to-node ( msg pid host port -- ) - io.sockets: io.sockets: [ - 2array serialize - ] with-stream ; +SYMBOL: local-node ( -- addrspec ) -: localnode ( -- node ) - \ localnode get ; +: start-node ( port -- ) + dup internet-server host-name rot (start-node) ; -: start-node ( hostname port -- ) - [ node-server ] in-thread - \ localnode set-global ; +TUPLE: remote-thread pid node ; -TUPLE: remote-process node pid ; +M: remote-thread send ( message thread -- ) + { remote-thread-pid remote-thread-node } get-slots + io.sockets: [ 2array serialize ] with-stream ; -C: remote-process - -M: remote-process send ( message process -- ) - #! Send the message via the inter-node protocol - { remote-process-pid remote-process-node } get-slots - { node-hostname node-port } get-slots - send-to-node ; - -M: process (serialize) ( obj -- ) - localnode swap process-pid (serialize) ; +M: thread (serialize) ( obj -- ) + thread-id local-node get-global + remote-thread construct-boa + (serialize) ; diff --git a/extra/concurrency/exchangers/authors.txt b/extra/concurrency/exchangers/authors.txt new file mode 100644 index 0000000000..1901f27a24 --- /dev/null +++ b/extra/concurrency/exchangers/authors.txt @@ -0,0 +1 @@ +Slava Pestov diff --git a/extra/concurrency/exchangers/exchangers.factor b/extra/concurrency/exchangers/exchangers.factor new file mode 100755 index 0000000000..39f01ae2ca --- /dev/null +++ b/extra/concurrency/exchangers/exchangers.factor @@ -0,0 +1,21 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: kernel concurrency.threads ; +IN: concurrency.exchangers + +! Motivated by +! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Exchanger.html + +TUPLE: exchanger thread ; + +: ( -- exchanger ) + f exchanger construct-boa ; + +: exchange ( obj exchanger -- newobj ) + dup exchanger-thread [ + dup exchanger-thread + f rot set-exchanger-thread + resume-with + ] [ + [ set-exchanger-thread ] curry suspend + ] if ; diff --git a/extra/concurrency/exchangers/exchangers.txt b/extra/concurrency/exchangers/exchangers.txt new file mode 100644 index 0000000000..ea69c91e03 --- /dev/null +++ b/extra/concurrency/exchangers/exchangers.txt @@ -0,0 +1 @@ +Thread rendezvous points diff --git a/extra/concurrency/authors.txt b/extra/concurrency/futures/authors.txt similarity index 50% rename from extra/concurrency/authors.txt rename to extra/concurrency/futures/authors.txt index 44b06f94bc..a8fb961d36 100644 --- a/extra/concurrency/authors.txt +++ b/extra/concurrency/futures/authors.txt @@ -1 +1,2 @@ Chris Double +Slava Pestov diff --git a/extra/concurrency/futures/futures.factor b/extra/concurrency/futures/futures.factor new file mode 100755 index 0000000000..fa8aba27fe --- /dev/null +++ b/extra/concurrency/futures/futures.factor @@ -0,0 +1,25 @@ +! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +IN: concurrency.futures + +: future ( quot -- future ) + [ + [ + >r + [ t 2array ] compose + [ f 2array ] recover + r> fulfill + ] 2curry "Future" spawn drop + ] keep ; inline + +: ?future-timeout ( future timeout -- value ) + ?promise-timeout first2 [ rethrow ] unless ; + +: ?future ( future -- value ) + f ?future-timeout ; + +: parallel-map ( seq quot -- newseq ) + [ curry future ] curry map [ ?future ] map ; + +: parallel-each ( seq quot -- ) + [ f ] compose parallel-map drop ; diff --git a/extra/concurrency/futures/summary.txt b/extra/concurrency/futures/summary.txt new file mode 100644 index 0000000000..12de3c6f7e --- /dev/null +++ b/extra/concurrency/futures/summary.txt @@ -0,0 +1 @@ +Deferred computations diff --git a/extra/concurrency/locks/authors.txt b/extra/concurrency/locks/authors.txt new file mode 100644 index 0000000000..1901f27a24 --- /dev/null +++ b/extra/concurrency/locks/authors.txt @@ -0,0 +1 @@ +Slava Pestov diff --git a/extra/concurrency/locks/locks.factor b/extra/concurrency/locks/locks.factor new file mode 100755 index 0000000000..182bf0a106 --- /dev/null +++ b/extra/concurrency/locks/locks.factor @@ -0,0 +1,87 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: dlists kernel concurrency.threads continuations math ; +IN: concurrency.locks + +! Simple critical sections +TUPLE: lock threads owner ; + +: lock construct-boa ; + +: notify-1 ( dlist -- ) + dup dlist-empty? [ pop-back resume ] [ drop ] if ; + +r >r over r> call over r> curry [ ] cleanup ; inline + +PRIVATE> + +: with-lock ( lock quot -- ) + [ acquire-lock ] [ release-lock ] do-lock ; inline + +: with-reentrant-lock ( lock quot -- ) + over lock-owner self eq? + [ nip call ] [ with-lock ] if ; inline + +! Many-reader/single-writer locks +TUPLE: rw-lock readers writers reader# writer ; + +: ( -- lock ) + 0 f rw-lock construct-boa ; + + or + [ dup wait-for-write-lock ] when + self over set-rw-lock-writer ; + +: release-write-lock ( lock -- ) + f over set-rw-lock-writer + dup rw-lock-readers dlist-empty? + [ notify-writer ] [ rw-lock-readers notify-all ] if ; + +: do-recursive-rw-lock ( lock quot quot' -- ) + >r over rw-lock-writer self eq? [ nip call ] r> if ; inline + +PRIVATE> + +: with-read-lock ( lock quot -- ) + [ + [ acquire-read-lock ] [ release-read-lock ] do-lock + ] do-recursive-rw-lock ; inline + +: with-write-lock ( lock quot -- ) + [ + [ acquire-write-lock ] [ release-write-lock ] do-lock + ] do-recursive-rw-lock ; inline diff --git a/extra/concurrency/locks/summary.txt b/extra/concurrency/locks/summary.txt new file mode 100644 index 0000000000..2ac51cd59b --- /dev/null +++ b/extra/concurrency/locks/summary.txt @@ -0,0 +1 @@ +Traditional locks and many reader/single writer locks diff --git a/extra/concurrency/messaging/authors.txt b/extra/concurrency/messaging/authors.txt new file mode 100644 index 0000000000..a8fb961d36 --- /dev/null +++ b/extra/concurrency/messaging/authors.txt @@ -0,0 +1,2 @@ +Chris Double +Slava Pestov diff --git a/extra/concurrency/concurrency-docs.factor b/extra/concurrency/messaging/messaging-docs.factor similarity index 100% rename from extra/concurrency/concurrency-docs.factor rename to extra/concurrency/messaging/messaging-docs.factor diff --git a/extra/concurrency/concurrency-tests.factor b/extra/concurrency/messaging/messaging-tests.factor similarity index 100% rename from extra/concurrency/concurrency-tests.factor rename to extra/concurrency/messaging/messaging-tests.factor diff --git a/extra/concurrency/messaging/messaging.factor b/extra/concurrency/messaging/messaging.factor new file mode 100755 index 0000000000..bd625ff499 --- /dev/null +++ b/extra/concurrency/messaging/messaging.factor @@ -0,0 +1,121 @@ +! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +! +! Concurrency library for Factor based on Erlang/Termite style +! concurrency. +IN: concurrency.messaging +USING: dlists concurrency.threads sequences continuations +namespaces random math quotations words kernel arrays assocs +init system ; + +TUPLE: mailbox threads data ; + +: ( -- mailbox ) + mailbox construct-boa ; + +: mailbox-empty? ( mailbox -- bool ) + mailbox-data dlist-empty? ; + +: notify-all ( dlist -- ) + [ second resume ] dlist-slurp yield ; + +: mailbox-put ( obj mailbox -- ) + [ mailbox-data push-front ] keep + mailbox-threads notify-all ; + + + +: mailbox-peek ( mailbox -- obj ) + mailbox-data peek-front ; + +: mailbox-get-timeout ( mailbox timeout -- obj ) + block-if-empty mailbox-data pop-front ; + +: mailbox-get ( mailbox -- obj ) + f mailbox-timeout-get ; + +: mailbox-get-all-timeout ( mailbox timeout -- array ) + (mailbox-block-if-empty) + [ dup mailbox-empty? ] + [ dup mailbox-data pop-back ] + [ ] unfold nip ; + +: mailbox-get-all ( mailbox -- array ) + f mailbox-timeout-get-all ; + +: while-mailbox-empty ( mailbox quot -- ) + over mailbox-empty? [ + dup >r swap slip r> while-mailbox-empty + ] [ + 2drop + ] if ; inline + +: mailbox-timeout-get? ( pred mailbox timeout -- obj ) + [ (mailbox-block-unless-pred) ] 3keep drop + mailbox-data delete-node-if ; inline + +: mailbox-get? ( pred mailbox -- obj ) + f mailbox-timeout-get? ; + +TUPLE: linked error thread ; + +: self linked construct-boa ; + +GENERIC: send ( message thread -- ) + +M: thread send ( message thread -- ) + thread-mailbox mailbox-put ; + +: ?linked dup linked? [ rethrow ] when ; + +: mailbox self thread-mailbox ; + +: receive ( -- message ) + mailbox mailbox-get ?linked ; + +: receive-if ( pred -- message ) + mailbox mailbox-get? ?linked ; inline + +: rethrow-linked ( error supervisor -- ) + >r r> send ; + +: spawn-linked ( quot name -- thread ) + self [ rethrow-linked ] curry [ (spawn) ] keep ; + +TUPLE: synchronous data sender tag ; + +: ( data -- sync ) + self random-256 synchronous construct-boa ; + +TUPLE: reply data tag ; + +: ( data synchronous -- reply ) + synchronous-tag \ reply construct-boa ; + +: send-synchronous ( message thread -- reply ) + >r dup r> send + [ over reply? [ reply-tag = ] [ 2drop f ] if ] curry + receive-if reply-data ; + +: reply-synchronous ( message synchronous -- ) + [ ] keep synchronous-sender reply ; diff --git a/extra/concurrency/messaging/summary.txt b/extra/concurrency/messaging/summary.txt new file mode 100644 index 0000000000..a41b7edb49 --- /dev/null +++ b/extra/concurrency/messaging/summary.txt @@ -0,0 +1 @@ +Erlang/Termite-style message-passing concurrency diff --git a/extra/concurrency/promises/authors.txt b/extra/concurrency/promises/authors.txt new file mode 100644 index 0000000000..a8fb961d36 --- /dev/null +++ b/extra/concurrency/promises/authors.txt @@ -0,0 +1,2 @@ +Chris Double +Slava Pestov diff --git a/extra/concurrency/promises/promises.factor b/extra/concurrency/promises/promises.factor new file mode 100755 index 0000000000..ecaa722b11 --- /dev/null +++ b/extra/concurrency/promises/promises.factor @@ -0,0 +1,24 @@ +! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +IN: concurrency.promises + +TUPLE: promise mailbox ; + +: ( -- promise ) + promise construct-boa ; + +: promise-fulfilled? ( promise -- ? ) + promise-mailbox mailbox-empty? not ; + +: fulfill ( value promise -- ) + dup promise-fulfilled? [ + "Promise already fulfilled" throw + ] [ + promise-mailbox mailbox-put + ] if ; + +: ?promise-timeout ( promise timeout -- result ) + >r promise-mailbox r> block-if-empty mailbox-peek ; + +: ?promise ( promise -- result ) + f ?promise-timeout ; diff --git a/extra/concurrency/promises/summary.txt b/extra/concurrency/promises/summary.txt new file mode 100644 index 0000000000..96c70cb31a --- /dev/null +++ b/extra/concurrency/promises/summary.txt @@ -0,0 +1 @@ +Thread-safe write-once variables diff --git a/extra/concurrency/semaphores/authors.txt b/extra/concurrency/semaphores/authors.txt new file mode 100644 index 0000000000..1901f27a24 --- /dev/null +++ b/extra/concurrency/semaphores/authors.txt @@ -0,0 +1 @@ +Slava Pestov diff --git a/extra/concurrency/semaphores/semaphores.factor b/extra/concurrency/semaphores/semaphores.factor new file mode 100755 index 0000000000..7bfaf4c1ce --- /dev/null +++ b/extra/concurrency/semaphores/semaphores.factor @@ -0,0 +1,20 @@ +IN: concurrency.semaphores + +TUPLE: semaphore count threads ; + +: ( -- semaphore ) + 0 semaphore construct-boa ; + +: wait-to-acquire ( semaphore -- ) + [ semaphore-threads push-front ] suspend drop ; + +: acquire ( semaphore -- ) + dup semaphore-count zero? [ + wait-to-acquire + ] [ + dup semaphore-count 1- swap set-semaphore-count + ] if ; + +: release ( semaphore -- ) + dup semaphore-count 1+ over set-semaphore-count + semaphore-threads notify-1 ; diff --git a/extra/concurrency/semaphores/summary.txt b/extra/concurrency/semaphores/summary.txt new file mode 100644 index 0000000000..15883d541f --- /dev/null +++ b/extra/concurrency/semaphores/summary.txt @@ -0,0 +1 @@ +Counting semaphores diff --git a/extra/concurrency/summary.txt b/extra/concurrency/summary.txt deleted file mode 100644 index 7f48dd43b4..0000000000 --- a/extra/concurrency/summary.txt +++ /dev/null @@ -1 +0,0 @@ -Erlang-style concurrency diff --git a/extra/concurrency/tags.txt b/extra/concurrency/tags.txt deleted file mode 100644 index f4274299b1..0000000000 --- a/extra/concurrency/tags.txt +++ /dev/null @@ -1 +0,0 @@ -extensions diff --git a/extra/io/launcher/launcher.factor b/extra/io/launcher/launcher.factor index dce893dcaf..be4445f842 100755 --- a/extra/io/launcher/launcher.factor +++ b/extra/io/launcher/launcher.factor @@ -2,7 +2,7 @@ ! See http://factorcode.org/license.txt for BSD license. USING: io io.backend io.timeouts system kernel namespaces strings hashtables sequences assocs combinators vocabs.loader -init threads continuations math ; +init concurrency.threads continuations math ; IN: io.launcher ! Non-blocking process exit notification facility @@ -83,7 +83,7 @@ HOOK: run-process* io-backend ( desc -- handle ) : wait-for-process ( process -- status ) [ dup process-handle - [ dup [ processes get at push stop ] curry callcc0 ] when + [ dup [ processes get at push ] curry suspend drop ] when dup process-killed? [ "Process was killed" throw ] [ process-status ] if ] with-timeout ; @@ -134,5 +134,5 @@ TUPLE: process-stream process ; : notify-exit ( status process -- ) [ set-process-status ] keep - [ processes get delete-at* drop [ schedule-thread ] each ] keep + [ processes get delete-at* drop [ resume ] each ] keep f swap set-process-handle ; diff --git a/extra/io/monitors/monitors.factor b/extra/io/monitors/monitors.factor index eff27614ae..2f54ea59fe 100755 --- a/extra/io/monitors/monitors.factor +++ b/extra/io/monitors/monitors.factor @@ -1,7 +1,7 @@ ! Copyright (C) 2008 Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. USING: io.backend kernel continuations namespaces sequences -assocs hashtables sorting arrays threads ; +assocs hashtables sorting arrays concurrency.threads ; IN: io.monitors add-io-task ] if ; : (wait-to-write) ( port -- ) diff --git a/extra/io/unix/launcher/launcher.factor b/extra/io/unix/launcher/launcher.factor index 5adf0d7453..51773515bf 100755 --- a/extra/io/unix/launcher/launcher.factor +++ b/extra/io/unix/launcher/launcher.factor @@ -124,4 +124,4 @@ M: unix-io process-stream* wait-for-processes [ 250 sleep ] when wait-loop ; : start-wait-thread ( -- ) - [ wait-loop ] in-thread ; + [ wait-loop ] "Process reaper" spawn drop ; diff --git a/extra/io/unix/unix-tests.factor b/extra/io/unix/unix-tests.factor index e1c3108952..4005fb6c09 100755 --- a/extra/io/unix/unix-tests.factor +++ b/extra/io/unix/unix-tests.factor @@ -18,7 +18,7 @@ IN: temporary ] with-stream "unix-domain-socket-test" resource-path delete-file -] in-thread +] "Test" spawn drop yield @@ -69,7 +69,7 @@ yield "unix-domain-datagram-test" resource-path delete-file ] with-scope -] in-thread +] "Test" spawn drop yield diff --git a/extra/io/windows/launcher/launcher.factor b/extra/io/windows/launcher/launcher.factor index cc3278dadc..4da8ed4046 100755 --- a/extra/io/windows/launcher/launcher.factor +++ b/extra/io/windows/launcher/launcher.factor @@ -4,7 +4,8 @@ USING: alien alien.c-types arrays continuations destructors io io.windows io.windows.nt.pipes libc io.nonblocking io.streams.duplex windows.types math windows.kernel32 windows namespaces io.launcher kernel sequences windows.errors assocs -splitting system threads init strings combinators io.backend ; +splitting system concurrency.threads init strings combinators +io.backend ; IN: io.windows.launcher TUPLE: CreateProcess-args @@ -150,6 +151,6 @@ M: windows-io kill-process* ( handle -- ) wait-loop ; : start-wait-thread ( -- ) - [ wait-loop ] in-thread ; + [ wait-loop ] "Process wait" spawn drop ; [ start-wait-thread ] "io.windows.launcher" add-init-hook diff --git a/extra/io/windows/nt/backend/backend.factor b/extra/io/windows/nt/backend/backend.factor index 597bc99be2..09d23e74e4 100755 --- a/extra/io/windows/nt/backend/backend.factor +++ b/extra/io/windows/nt/backend/backend.factor @@ -1,14 +1,15 @@ USING: alien alien.c-types arrays assocs combinators continuations destructors io io.backend io.nonblocking io.windows libc kernel math namespaces sequences -threads tuples.lib windows windows.errors windows.kernel32 -strings splitting io.files qualified ascii combinators.lib ; +concurrency.threads tuples.lib windows windows.errors +windows.kernel32 strings splitting io.files qualified ascii +combinators.lib ; QUALIFIED: windows.winsock IN: io.windows.nt.backend SYMBOL: io-hash -TUPLE: io-callback port continuation ; +TUPLE: io-callback port thread ; C: io-callback @@ -52,8 +53,8 @@ M: windows-nt-io add-completion ( handle -- ) [ swap dup alien? [ "bad overlapped in save-callback" throw ] unless - io-hash get-global set-at stop - ] callcc0 2drop ; + io-hash get-global set-at + ] suspend 3drop ; : wait-for-overlapped ( ms -- overlapped ? ) >r master-completion-port get-global r> ! port ms @@ -77,11 +78,11 @@ M: windows-nt-io add-completion ( handle -- ) ] [ (win32-error-string) swap lookup-callback [ io-callback-port set-port-error ] keep - ] if io-callback-continuation schedule-thread f + ] if io-callback-thread resume f ] if ] [ lookup-callback - io-callback-continuation schedule-thread f + io-callback-thread resume f ] if ; : drain-overlapped ( timeout -- ) diff --git a/extra/io/windows/nt/files/files.factor b/extra/io/windows/nt/files/files.factor index f2be11855b..d33465ae76 100755 --- a/extra/io/windows/nt/files/files.factor +++ b/extra/io/windows/nt/files/files.factor @@ -1,8 +1,8 @@ USING: continuations destructors io.buffers io.files io.backend io.timeouts io.nonblocking io.windows io.windows.nt.backend -kernel libc math threads windows windows.kernel32 alien.c-types -alien.arrays sequences combinators combinators.lib sequences.lib -ascii splitting alien strings assocs ; +kernel libc math concurrency.threads windows windows.kernel32 +alien.c-types alien.arrays sequences combinators combinators.lib +sequences.lib ascii splitting alien strings assocs ; IN: io.windows.nt.files M: windows-nt-io cwd diff --git a/extra/io/windows/nt/nt.factor b/extra/io/windows/nt/nt.factor index da7e83baca..be57a398a2 100755 --- a/extra/io/windows/nt/nt.factor +++ b/extra/io/windows/nt/nt.factor @@ -12,3 +12,5 @@ USE: io.windows.mmap USE: io.backend T{ windows-nt-io } set-io-backend + +"vocabs.monitor" require diff --git a/extra/io/windows/nt/sockets/sockets.factor b/extra/io/windows/nt/sockets/sockets.factor index eef7476dd5..9f82350f54 100755 --- a/extra/io/windows/nt/sockets/sockets.factor +++ b/extra/io/windows/nt/sockets/sockets.factor @@ -2,7 +2,7 @@ USING: alien alien.accessors alien.c-types byte-arrays continuations destructors io.nonblocking io.timeouts io.sockets io.sockets.impl io namespaces io.streams.duplex io.windows io.windows.nt.backend windows.winsock kernel libc math sequences -threads tuples.lib ; +concurrency.threads tuples.lib ; IN: io.windows.nt.sockets : malloc-int ( object -- object ) diff --git a/extra/tools/interpreter/interpreter.factor b/extra/tools/interpreter/interpreter.factor index 02c0af89ac..17a3412e93 100755 --- a/extra/tools/interpreter/interpreter.factor +++ b/extra/tools/interpreter/interpreter.factor @@ -3,7 +3,8 @@ USING: arrays assocs classes combinators sequences.private continuations continuations.private generic hashtables io kernel kernel.private math namespaces namespaces.private prettyprint -quotations sequences splitting strings threads vectors words ; +quotations sequences splitting strings concurrency.threads +vectors words ; IN: tools.interpreter : walk ( quot -- ) \ break add* call ; diff --git a/extra/tools/threads/threads.factor b/extra/tools/threads/threads.factor new file mode 100755 index 0000000000..0690042a3e --- /dev/null +++ b/extra/tools/threads/threads.factor @@ -0,0 +1,22 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +IN: tools.threads +USING: concurrency.threads kernel prettyprint prettyprint.config +io io.styles sequences assocs namespaces sorting ; + +: thread. ( thread -- ) + dup thread-id pprint-cell + dup thread-name pprint-cell + thread-continuation "Waiting" "Running" ? [ write ] with-cell ; + +: threads. ( -- ) + standard-table-style [ + [ + { "ID" "Name" "State" } + [ [ write ] with-cell ] each + ] with-row + + threads get-global >alist sort-keys values [ + [ thread. ] with-row + ] each + ] tabular-output ; diff --git a/extra/ui/tools/interactor/interactor.factor b/extra/ui/tools/interactor/interactor.factor index e667b1206b..791b68246b 100755 --- a/extra/ui/tools/interactor/interactor.factor +++ b/extra/ui/tools/interactor/interactor.factor @@ -3,20 +3,20 @@ USING: arrays assocs combinators continuations documents ui.tools.workspace hashtables io io.styles kernel math math.vectors models namespaces parser prettyprint quotations -sequences sequences.lib strings threads listener tuples -ui.commands ui.gadgets ui.gadgets.editors +sequences sequences.lib strings concurrency.threads listener +tuples ui.commands ui.gadgets ui.gadgets.editors ui.gadgets.presentations ui.gadgets.worlds ui.gestures definitions ; IN: ui.tools.interactor TUPLE: interactor history output -continuation quot busy? +thread quot busy? help ; : interactor-use ( interactor -- seq ) use swap - interactor-continuation continuation-name + interactor-thread thread-continuation continuation-name assoc-stack ; : init-caret-help ( interactor -- ) @@ -37,10 +37,6 @@ M: interactor graft* dup dup interactor-help add-connection f swap set-interactor-busy? ; -M: interactor ungraft* - dup dup interactor-help remove-connection - delegate ungraft* ; - : word-at-loc ( loc interactor -- word ) over [ [ gadget-model T{ one-word-elt } elt-string ] keep @@ -70,7 +66,7 @@ M: interactor model-changed : interactor-continue ( obj interactor -- ) t over set-interactor-busy? - interactor-continuation schedule-thread-with ; + interactor-thread resume-with ; : clear-input ( interactor -- ) gadget-model clear-doc ; @@ -88,14 +84,16 @@ M: interactor model-changed : evaluate-input ( interactor -- ) dup interactor-busy? [ - [ - [ control-value ] keep interactor-continue - ] in-thread + dup control-value over interactor-continue ] unless drop ; : interactor-yield ( interactor -- obj ) - f over set-interactor-busy? - [ set-interactor-continuation stop ] curry callcc1 ; + dup gadget-graft-state first [ + f over set-interactor-busy? + [ set-interactor-thread ] curry suspend + ] [ + drop f + ] if ; M: interactor stream-readln [ interactor-yield ] keep interactor-finish ?first ; diff --git a/extra/ui/tools/listener/listener.factor b/extra/ui/tools/listener/listener.factor index 3a3ba5f1af..0f6a45de52 100755 --- a/extra/ui/tools/listener/listener.factor +++ b/extra/ui/tools/listener/listener.factor @@ -6,7 +6,7 @@ kernel models namespaces parser quotations sequences ui.commands ui.gadgets ui.gadgets.editors ui.gadgets.labelled ui.gadgets.panes ui.gadgets.buttons ui.gadgets.scrollers ui.gadgets.tracks ui.gestures ui.operations vocabs words -prettyprint listener debugger threads ; +prettyprint listener debugger concurrency.threads ; IN: ui.tools.listener TUPLE: listener-gadget input output stack ; @@ -134,8 +134,7 @@ M: stack-display tool-scroller ] with-stream* ; : restart-listener ( listener -- ) - [ >r clear r> init-namespaces listener-thread ] in-thread - drop ; + [ listener-thread ] curry "Listener" spawn drop ; : init-listener ( listener -- ) f swap set-listener-gadget-stack ; diff --git a/extra/ui/tools/walker/walker.factor b/extra/ui/tools/walker/walker.factor index a23345d214..e80d87d591 100755 --- a/extra/ui/tools/walker/walker.factor +++ b/extra/ui/tools/walker/walker.factor @@ -2,10 +2,11 @@ ! See http://factorcode.org/license.txt for BSD license. USING: arrays assocs ui.tools.listener ui.tools.traceback ui.tools.workspace inspector kernel models namespaces -prettyprint quotations sequences threads tools.interpreter -ui.commands ui.gadgets ui.gadgets.labelled ui.gadgets.tracks -ui.gestures ui.gadgets.buttons ui.gadgets.panes -prettyprint.config prettyprint.backend continuations ; +prettyprint quotations sequences concurrency.threads +tools.interpreter ui.commands ui.gadgets ui.gadgets.labelled +ui.gadgets.tracks ui.gestures ui.gadgets.buttons +ui.gadgets.panes prettyprint.config prettyprint.backend +continuations ; IN: ui.tools.walker TUPLE: walker model interpreter history ; diff --git a/extra/ui/ui.factor b/extra/ui/ui.factor index c214eee8d5..c38ce2b44a 100755 --- a/extra/ui/ui.factor +++ b/extra/ui/ui.factor @@ -1,10 +1,10 @@ ! Copyright (C) 2006, 2007 Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. USING: arrays assocs io kernel math models namespaces -prettyprint dlists sequences threads sequences words timers -debugger ui.gadgets ui.gadgets.worlds ui.gadgets.tracks -ui.gestures ui.backend ui.render continuations init -combinators hashtables ; +prettyprint dlists sequences concurrency.threads sequences words +timers debugger ui.gadgets ui.gadgets.worlds ui.gadgets.tracks +ui.gestures ui.backend ui.render continuations init combinators +hashtables ; IN: ui ! Assoc mapping aliens to gadgets diff --git a/extra/ui/windows/windows.factor b/extra/ui/windows/windows.factor index c831a959d0..4f5b9bd6a8 100755 --- a/extra/ui/windows/windows.factor +++ b/extra/ui/windows/windows.factor @@ -4,10 +4,10 @@ USING: alien alien.c-types arrays assocs ui ui.gadgets ui.backend ui.clipboards ui.gadgets.worlds ui.gestures io kernel math math.vectors namespaces prettyprint sequences strings vectors words windows.kernel32 windows.gdi32 windows.user32 -windows.opengl32 windows.messages windows.types -windows.nt windows threads timers libc combinators continuations -command-line shuffle opengl ui.render unicode.case ascii -math.bitfields ; +windows.opengl32 windows.messages windows.types windows.nt +windows concurrency.threads timers libc combinators +continuations command-line shuffle opengl ui.render unicode.case +ascii math.bitfields ; IN: ui.windows TUPLE: windows-ui-backend ; diff --git a/extra/vocabs/monitor/monitor.factor b/extra/vocabs/monitor/monitor.factor index e5b9a8c3a1..f22002ee6a 100755 --- a/extra/vocabs/monitor/monitor.factor +++ b/extra/vocabs/monitor/monitor.factor @@ -1,18 +1,18 @@ -USING: threads io.files io.monitors init kernel tools.browser -continuations ; +USING: concurrency.threads io.files io.monitors init kernel +tools.browser ; IN: vocabs.monitor ! Use file system change monitoring to flush the tags/authors ! cache -: update-thread ( monitor -- ) - dup next-change 2drop reset-cache update-thread ; +: (monitor-thread) ( monitor -- ) + dup next-change 2drop reset-cache (monitor-thread) ; -: start-update-thread +: monitor-thread ( -- ) + "" resource-path t (monitor-thread) ; + +: start-monitor-thread #! Silently ignore errors during monitor creation since #! monitors are not supported on all platforms. - [ - [ "" resource-path t ] [ drop f ] recover - [ update-thread ] when* - ] in-thread ; + [ monitor-thread ] "Vocabulary monitor" spawn drop ; -[ start-update-thread ] "tools.browser" add-init-hook +[ start-monitor-thread ] "vocabs.monitor" add-init-hook diff --git a/extra/webapps/planet/planet.factor b/extra/webapps/planet/planet.factor index 3e008d049d..456855c1fa 100755 --- a/extra/webapps/planet/planet.factor +++ b/extra/webapps/planet/planet.factor @@ -100,7 +100,7 @@ SYMBOL: last-update : update-thread ( -- ) millis last-update set-global - [ update-cached-postings ] in-thread + [ update-cached-postings ] "RSS feed update slave" spawn drop 10 60 * 1000 * sleep update-thread ; @@ -109,7 +109,7 @@ SYMBOL: last-update "webapps.planet" [ update-thread ] with-logging - ] in-thread ; + ] "RSS feed update master" spawn drop ; "planet" "planet-factor" "extra/webapps/planet" web-app diff --git a/vm/run.h b/vm/run.h index 86cf1c0e1f..1fcb4bedb4 100755 --- a/vm/run.h +++ b/vm/run.h @@ -1,4 +1,4 @@ -#define USER_ENV 40 +#define USER_ENV 64 typedef enum { NAMESTACK_ENV, /* used by library only */ @@ -54,7 +54,9 @@ typedef enum { STDERR_ENV = 38, /* stderr FILE* handle */ - STAGE2_ENV = 39 /* have we bootstrapped? */ + STAGE2_ENV = 39, /* have we bootstrapped? */ + + CURRENT_THREAD_ENV = 40 } F_ENVTYPE; #define FIRST_SAVE_ENV BOOT_ENV