diff --git a/extra/benchmark/sockets/sockets.factor b/extra/benchmark/sockets/sockets.factor index 40012e1638..36529facaa 100755 --- a/extra/benchmark/sockets/sockets.factor +++ b/extra/benchmark/sockets/sockets.factor @@ -1,5 +1,5 @@ USING: io.sockets io.server io kernel math threads -debugger tools.time prettyprint ; +debugger tools.time prettyprint concurrency.combinators ; IN: benchmark.sockets : simple-server ( -- ) diff --git a/extra/calendar/model/model.factor b/extra/calendar/model/model.factor index be98c7491e..aa295e0f75 100755 --- a/extra/calendar/model/model.factor +++ b/extra/calendar/model/model.factor @@ -1,6 +1,6 @@ ! Copyright (C) 2008 Slava Pestov ! See http://factorcode.org/license.txt for BSD license. -USING: calendar namespaces models threads init ; +USING: calendar namespaces models threads kernel init ; IN: calendar.model SYMBOL: time diff --git a/extra/channels/channels.factor b/extra/channels/channels.factor index 07b5d2f5d5..01f810b8e3 100755 --- a/extra/channels/channels.factor +++ b/extra/channels/channels.factor @@ -19,7 +19,7 @@ GENERIC: from ( channel -- value ) [ channel-senders push stop ] curry callcc0 ; : (to) ( value receivers -- ) - delete-random schedule-thread-with yield ; + delete-random resume-with yield ; : notify ( continuation channel -- channel ) [ channel-receivers push ] keep ; diff --git a/extra/channels/remote/remote.factor b/extra/channels/remote/remote.factor index 40a6df350b..b5a497b1fa 100755 --- a/extra/channels/remote/remote.factor +++ b/extra/channels/remote/remote.factor @@ -4,7 +4,7 @@ ! Remote Channels USING: kernel init namespaces assocs arrays random sequences channels match concurrency.messaging -concurrency.distributed ; +concurrency.distributed threads ; IN: channels.remote : start-channel-node ( -- ) "remote-channels" get-process [ - "remote-channels" - [ channel-process ] "Remote channels" spawn - register-process + "remote-channels" + [ channel-process ] "Remote channels" spawn-server + register-process ] unless ; TUPLE: remote-channel node id ; @@ -52,12 +52,12 @@ TUPLE: remote-channel node id ; C: remote-channel M: remote-channel to ( value remote-channel -- ) - dup >r [ \ to , remote-channel-id , , ] { } make r> + [ [ \ to , remote-channel-id , , ] { } make ] keep remote-channel-node "remote-channels" send-synchronous no-channel = [ no-channel throw ] when ; M: remote-channel from ( remote-channel -- value ) - dup >r [ \ from , remote-channel-id , ] { } make r> + [ [ \ from , remote-channel-id , ] { } make ] keep remote-channel-node "remote-channels" send-synchronous dup no-channel = [ no-channel throw ] when* ; diff --git a/extra/channels/sniffer/sniffer.factor b/extra/channels/sniffer/sniffer.factor index 1502201225..cbf31c71e3 100755 --- a/extra/channels/sniffer/sniffer.factor +++ b/extra/channels/sniffer/sniffer.factor @@ -2,8 +2,8 @@ ! See http://factorcode.org/license.txt for BSD license. ! ! Wrap a sniffer in a channel -USING: kernel channels concurrency io io.backend -io.sniffer io.sniffer.backend system vocabs.loader ; +USING: kernel channels io io.backend io.sniffer +io.sniffer.backend system vocabs.loader ; : (sniff-channel) ( stream channel -- ) 4096 pick stream-read-partial over to (sniff-channel) ; diff --git a/extra/concurrency/combinators/combinators.factor b/extra/concurrency/combinators/combinators.factor new file mode 100755 index 0000000000..5e19baf393 --- /dev/null +++ b/extra/concurrency/combinators/combinators.factor @@ -0,0 +1,13 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: concurrency.futures concurrency.count-downs sequences +kernel ; +IN: concurrency.combinators + +: parallel-map ( seq quot -- newseq ) + [ curry future ] curry map dup [ ?future ] change-each ; + inline + +: parallel-each ( seq quot -- ) + "Parallel each" pick length + [ [ spawn-stage ] 2curry each ] keep await ; inline diff --git a/extra/concurrency/conditions/conditions.factor b/extra/concurrency/conditions/conditions.factor new file mode 100755 index 0000000000..f94cfe6cca --- /dev/null +++ b/extra/concurrency/conditions/conditions.factor @@ -0,0 +1,13 @@ +! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: dlists threads kernel arrays sequences ; +IN: concurrency.conditions + +: notify-1 ( dlist -- ) + dup dlist-empty? [ pop-back resume ] [ drop ] if ; + +: notify-all ( dlist -- ) + [ second resume ] dlist-slurp yield ; + +: wait ( queue timeout -- queue timeout ) + [ 2array swap push-front ] suspend 3drop ; inline diff --git a/extra/concurrency/count-downs/count-downs.factor b/extra/concurrency/count-downs/count-downs.factor new file mode 100755 index 0000000000..122076eeb1 --- /dev/null +++ b/extra/concurrency/count-downs/count-downs.factor @@ -0,0 +1,32 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: dlists kernel math concurrency.promises +concurrency.messaging ; +IN: concurrency.count-downs + +! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html + +TUPLE: count-down n promise ; + +: ( n -- count-down ) + count-down construct-boa ; + +: count-down ( count-down -- ) + dup count-down-n dup zero? [ + "Count down already done" throw + ] [ + 1- dup pick set-count-down-n + zero? [ + t swap count-down-promise fulfill + ] [ drop ] if + ] if ; + +: await-timeout ( count-down timeout -- ) + >r count-down-promise r> ?promise-timeout drop ; + +: spawn-stage ( quot name count-down -- ) + count-down-promise + promise-mailbox spawn-linked-to drop ; + +: await ( count-down -- ) + f await-timeout ; diff --git a/extra/concurrency/distributed/distributed-docs.factor b/extra/concurrency/distributed/distributed-docs.factor index 8f82d1aebf..ad9e392771 100755 --- a/extra/concurrency/distributed/distributed-docs.factor +++ b/extra/concurrency/distributed/distributed-docs.factor @@ -1,15 +1,7 @@ USING: help.markup help.syntax concurrency.messaging ; IN: concurrency.distributed -HELP: -{ $values { "node" "a node object" } - { "pid" "a process id" } - { "remote-process" "the constructed remote-process object" } +HELP: local-node +{ $values { "addrspec" "an address specifier" } } -{ $description "Constructs a proxy to a process running on another node. It can be used to send messages to the process it is acting as a proxy for." } -{ $see-also spawn send } ; - -HELP: localnode -{ $values { "node" "a node object" } -} -{ $description "Return the node the process is currently running on." } ; +{ $description "Return the node the current thread is running on." } ; diff --git a/extra/concurrency/distributed/distributed.factor b/extra/concurrency/distributed/distributed.factor index 30587b35ac..2c54a872f7 100755 --- a/extra/concurrency/distributed/distributed.factor +++ b/extra/concurrency/distributed/distributed.factor @@ -6,8 +6,10 @@ namespaces kernel ; QUALIFIED: io.sockets IN: concurrency.distributed +SYMBOL: local-node ( -- addrspec ) + : handle-node-client ( -- ) - deserialize first2 thread send ; + deserialize first2 get-process send ; : (start-node) ( addrspecs addrspec -- ) [ @@ -16,18 +18,19 @@ IN: concurrency.distributed [ handle-node-client ] with-server ] 2curry f spawn drop ; -SYMBOL: local-node ( -- addrspec ) - : start-node ( port -- ) - dup internet-server host-name rot (start-node) ; + dup internet-server io.sockets:host-name + rot io.sockets: (start-node) ; -TUPLE: remote-thread pid node ; +TUPLE: remote-process id node ; -M: remote-thread send ( message thread -- ) - { remote-thread-pid remote-thread-node } get-slots +C: remote-process + +M: remote-process send ( message thread -- ) + { remote-process-id remote-process-node } get-slots io.sockets: [ 2array serialize ] with-stream ; M: thread (serialize) ( obj -- ) thread-id local-node get-global - remote-thread construct-boa + (serialize) ; diff --git a/extra/concurrency/futures/futures.factor b/extra/concurrency/futures/futures.factor index fa8aba27fe..0a05d2d78e 100755 --- a/extra/concurrency/futures/futures.factor +++ b/extra/concurrency/futures/futures.factor @@ -1,25 +1,17 @@ ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. +USING: concurrency.promises concurrency.messaging kernel arrays +continuations ; IN: concurrency.futures : future ( quot -- future ) [ - [ - >r - [ t 2array ] compose - [ f 2array ] recover - r> fulfill - ] 2curry "Future" spawn drop + [ [ >r call r> fulfill ] 2curry "Future" ] keep + promise-mailbox spawn-linked-to drop ] keep ; inline : ?future-timeout ( future timeout -- value ) - ?promise-timeout first2 [ rethrow ] unless ; + ?promise-timeout ; : ?future ( future -- value ) - f ?future-timeout ; - -: parallel-map ( seq quot -- newseq ) - [ curry future ] curry map [ ?future ] map ; - -: parallel-each ( seq quot -- ) - [ f ] compose parallel-map drop ; + ?promise ; diff --git a/extra/concurrency/locks/locks.factor b/extra/concurrency/locks/locks.factor index 3a792768a7..50a62e3f6f 100755 --- a/extra/concurrency/locks/locks.factor +++ b/extra/concurrency/locks/locks.factor @@ -1,6 +1,7 @@ ! Copyright (C) 2008 Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. -USING: dlists kernel threads continuations math ; +USING: dlists kernel threads continuations math +concurrency.conditions ; IN: concurrency.locks ! Simple critical sections @@ -8,31 +9,26 @@ TUPLE: lock threads owner ; : lock construct-boa ; -: notify-1 ( dlist -- ) - dup dlist-empty? [ pop-back resume ] [ drop ] if ; - r lock-threads r> wait ] when drop self swap set-lock-owner ; : release-lock ( lock -- ) f over set-lock-owner lock-threads notify-1 ; -: do-lock ( lock quot acquire release -- ) - >r >r over r> call over r> curry [ ] cleanup ; inline +: do-lock ( lock timeout quot acquire release -- ) + >r >r pick r> call over r> curry [ ] cleanup ; inline PRIVATE> -: with-lock ( lock quot -- ) +: with-lock ( lock timeout quot -- ) [ acquire-lock ] [ release-lock ] do-lock ; inline -: with-reentrant-lock ( lock quot -- ) +: with-reentrant-lock ( lock timeout quot -- ) over lock-owner self eq? [ nip call ] [ with-lock ] if ; inline @@ -44,44 +40,39 @@ TUPLE: rw-lock readers writers reader# writer ; r rw-lock-readers r> wait ] when drop dup rw-lock-reader# 1+ swap set-rw-lock-reader# ; : notify-writer ( lock -- ) - lock-writers notify-1 ; + rw-lock-writers notify-1 ; : release-read-lock ( lock -- ) dup rw-lock-reader# 1- dup pick set-rw-lock-reader# - zero? [ notify-writers ] [ drop ] if ; - -: wait-for-write-lock ( lock -- ) - [ swap lock-writers push-front ] suspend drop ; + zero? [ notify-writer ] [ drop ] if ; : acquire-write-lock ( lock -- ) dup rw-lock-writer over rw-lock-reader# 0 > or - [ dup wait-for-write-lock ] when - self over set-rw-lock-writer ; + [ 2dup >r rw-lock-writers r> wait ] when drop + self swap set-rw-lock-writer ; : release-write-lock ( lock -- ) f over set-rw-lock-writer dup rw-lock-readers dlist-empty? [ notify-writer ] [ rw-lock-readers notify-all ] if ; -: do-recursive-rw-lock ( lock quot quot' -- ) - >r over rw-lock-writer self eq? [ nip call ] r> if ; inline +: do-recursive-rw-lock ( lock timeout quot quot' -- ) + >r pick rw-lock-writer self eq? [ 2nip call ] r> if ; inline PRIVATE> -: with-read-lock ( lock quot -- ) +: with-read-lock ( lock timeout quot -- ) [ [ acquire-read-lock ] [ release-read-lock ] do-lock ] do-recursive-rw-lock ; inline -: with-write-lock ( lock quot -- ) +: with-write-lock ( lock timeout quot -- ) [ [ acquire-write-lock ] [ release-write-lock ] do-lock ] do-recursive-rw-lock ; inline diff --git a/extra/concurrency/messaging/messaging.factor b/extra/concurrency/messaging/messaging.factor index 220be64364..e7a860495f 100755 --- a/extra/concurrency/messaging/messaging.factor +++ b/extra/concurrency/messaging/messaging.factor @@ -6,7 +6,7 @@ IN: concurrency.messaging USING: dlists threads sequences continuations namespaces random math quotations words kernel arrays assocs -init system ; +init system concurrency.conditions ; TUPLE: mailbox threads data ; @@ -16,29 +16,22 @@ TUPLE: mailbox threads data ; : mailbox-empty? ( mailbox -- bool ) mailbox-data dlist-empty? ; -: notify-all ( dlist -- ) - [ second resume ] dlist-slurp yield ; - : mailbox-put ( obj mailbox -- ) [ mailbox-data push-front ] keep mailbox-threads notify-all ; r r> send ; +: spawn-linked-to ( quot name mailbox -- thread ) + [ >r r> mailbox-put ] curry + [ (spawn) ] keep ; + : spawn-linked ( quot name -- thread ) - self [ rethrow-linked ] curry [ (spawn) ] keep ; + mailbox spawn-linked-to ; TUPLE: synchronous data sender tag ; @@ -124,3 +121,21 @@ TUPLE: reply data tag ; : reply-synchronous ( message synchronous -- ) [ ] keep synchronous-sender send ; + + + +: register-process ( name process -- ) + swap remote-processes set-at ; + +: unregister-process ( name -- ) + remote-processes delete-at ; + +: get-process ( name -- process ) + dup remote-processes at [ ] [ thread ] ?if ; + +\ remote-processes global [ H{ } assoc-like ] change-at diff --git a/extra/concurrency/promises/promises.factor b/extra/concurrency/promises/promises.factor index ecaa722b11..6610a8c7ed 100755 --- a/extra/concurrency/promises/promises.factor +++ b/extra/concurrency/promises/promises.factor @@ -1,5 +1,7 @@ ! Copyright (C) 2005, 2008 Chris Double, Slava Pestov. ! See http://factorcode.org/license.txt for BSD license. +USING: concurrency.messaging concurrency.messaging.private +kernel ; IN: concurrency.promises TUPLE: promise mailbox ; @@ -18,7 +20,8 @@ TUPLE: promise mailbox ; ] if ; : ?promise-timeout ( promise timeout -- result ) - >r promise-mailbox r> block-if-empty mailbox-peek ; + >r promise-mailbox r> block-if-empty + mailbox-peek ?linked ; : ?promise ( promise -- result ) f ?promise-timeout ; diff --git a/extra/concurrency/semaphores/semaphores.factor b/extra/concurrency/semaphores/semaphores.factor index 7bfaf4c1ce..4afa02307a 100755 --- a/extra/concurrency/semaphores/semaphores.factor +++ b/extra/concurrency/semaphores/semaphores.factor @@ -1,3 +1,6 @@ +! Copyright (C) 2008 Slava Pestov. +! See http://factorcode.org/license.txt for BSD license. +USING: dlists kernel threads math ; IN: concurrency.semaphores TUPLE: semaphore count threads ; diff --git a/extra/io/server/server.factor b/extra/io/server/server.factor index f988b1eeab..160af21661 100755 --- a/extra/io/server/server.factor +++ b/extra/io/server/server.factor @@ -3,7 +3,7 @@ USING: io io.sockets io.files logging continuations kernel math math.parser namespaces parser sequences strings prettyprint debugger quotations calendar -threads concurrency.futures ; +threads concurrency.combinators ; IN: io.server diff --git a/extra/webapps/planet/planet.factor b/extra/webapps/planet/planet.factor index e4820c0d59..062f6dbce2 100755 --- a/extra/webapps/planet/planet.factor +++ b/extra/webapps/planet/planet.factor @@ -1,6 +1,6 @@ -USING: sequences rss arrays concurrency.futures kernel sorting -html.elements io assocs namespaces math threads -vocabs html furnace http.server.templating calendar math.parser +USING: sequences rss arrays concurrency.combinators kernel +sorting html.elements io assocs namespaces math threads vocabs +html furnace http.server.templating calendar math.parser splitting continuations debugger system http.server.responders xml.writer prettyprint logging ; IN: webapps.planet