Updating libraries

db4
Slava Pestov 2008-02-18 09:08:59 -06:00
parent 9edb5875e3
commit cd8ab4ba8d
17 changed files with 155 additions and 98 deletions

View File

@ -1,5 +1,5 @@
USING: io.sockets io.server io kernel math threads
debugger tools.time prettyprint ;
debugger tools.time prettyprint concurrency.combinators ;
IN: benchmark.sockets
: simple-server ( -- )

View File

@ -1,6 +1,6 @@
! Copyright (C) 2008 Slava Pestov
! See http://factorcode.org/license.txt for BSD license.
USING: calendar namespaces models threads init ;
USING: calendar namespaces models threads kernel init ;
IN: calendar.model
SYMBOL: time

View File

@ -19,7 +19,7 @@ GENERIC: from ( channel -- value )
[ channel-senders push stop ] curry callcc0 ;
: (to) ( value receivers -- )
delete-random schedule-thread-with yield ;
delete-random resume-with yield ;
: notify ( continuation channel -- channel )
[ channel-receivers push ] keep ;

View File

@ -4,7 +4,7 @@
! Remote Channels
USING: kernel init namespaces assocs arrays random
sequences channels match concurrency.messaging
concurrency.distributed ;
concurrency.distributed threads ;
IN: channels.remote
<PRIVATE
@ -24,27 +24,27 @@ PRIVATE>
<PRIVATE
MATCH-VARS: ?id ?value ;
MATCH-VARS: ?from ?tag ?id ?value ;
SYMBOL: no-channel
: channel-process ( -- )
receive
{
{ { ?from ?tag { to ?id ?value } }
[ ?value ?id get-channel [ to f ] [ no-channel ] if* ?tag swap 2array ?from send ] }
{ { ?from ?tag { from ?id } }
[ ?id get-channel [ from ] [ no-channel ] if* ?tag swap 2array ?from send ] }
} match-cond
channel-process ;
receive [
{
{ { to ?id ?value }
[ ?value ?id get-channel [ to f ] [ no-channel ] if* ] }
{ { from ?id }
[ ?id get-channel [ from ] [ no-channel ] if* ] }
} match-cond
] keep reply-synchronous ;
PRIVATE>
: start-channel-node ( -- )
"remote-channels" get-process [
"remote-channels"
[ channel-process ] "Remote channels" spawn
register-process
"remote-channels"
[ channel-process ] "Remote channels" spawn-server
register-process
] unless ;
TUPLE: remote-channel node id ;
@ -52,12 +52,12 @@ TUPLE: remote-channel node id ;
C: <remote-channel> remote-channel
M: remote-channel to ( value remote-channel -- )
dup >r [ \ to , remote-channel-id , , ] { } make r>
[ [ \ to , remote-channel-id , , ] { } make ] keep
remote-channel-node "remote-channels" <remote-process>
send-synchronous no-channel = [ no-channel throw ] when ;
M: remote-channel from ( remote-channel -- value )
dup >r [ \ from , remote-channel-id , ] { } make r>
[ [ \ from , remote-channel-id , ] { } make ] keep
remote-channel-node "remote-channels" <remote-process>
send-synchronous dup no-channel = [ no-channel throw ] when* ;

View File

@ -2,8 +2,8 @@
! See http://factorcode.org/license.txt for BSD license.
!
! Wrap a sniffer in a channel
USING: kernel channels concurrency io io.backend
io.sniffer io.sniffer.backend system vocabs.loader ;
USING: kernel channels io io.backend io.sniffer
io.sniffer.backend system vocabs.loader ;
: (sniff-channel) ( stream channel -- )
4096 pick stream-read-partial over to (sniff-channel) ;

View File

@ -0,0 +1,13 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.futures concurrency.count-downs sequences
kernel ;
IN: concurrency.combinators
: parallel-map ( seq quot -- newseq )
[ curry future ] curry map dup [ ?future ] change-each ;
inline
: parallel-each ( seq quot -- )
"Parallel each" pick length <count-down>
[ [ spawn-stage ] 2curry each ] keep await ; inline

View File

@ -0,0 +1,13 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists threads kernel arrays sequences ;
IN: concurrency.conditions
: notify-1 ( dlist -- )
dup dlist-empty? [ pop-back resume ] [ drop ] if ;
: notify-all ( dlist -- )
[ second resume ] dlist-slurp yield ;
: wait ( queue timeout -- queue timeout )
[ 2array swap push-front ] suspend 3drop ; inline

View File

@ -0,0 +1,32 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists kernel math concurrency.promises
concurrency.messaging ;
IN: concurrency.count-downs
! http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html
TUPLE: count-down n promise ;
: <count-down> ( n -- count-down )
<dlist> count-down construct-boa ;
: count-down ( count-down -- )
dup count-down-n dup zero? [
"Count down already done" throw
] [
1- dup pick set-count-down-n
zero? [
t swap count-down-promise fulfill
] [ drop ] if
] if ;
: await-timeout ( count-down timeout -- )
>r count-down-promise r> ?promise-timeout drop ;
: spawn-stage ( quot name count-down -- )
count-down-promise
promise-mailbox spawn-linked-to drop ;
: await ( count-down -- )
f await-timeout ;

View File

@ -1,15 +1,7 @@
USING: help.markup help.syntax concurrency.messaging ;
IN: concurrency.distributed
HELP: <remote-process>
{ $values { "node" "a node object" }
{ "pid" "a process id" }
{ "remote-process" "the constructed remote-process object" }
HELP: local-node
{ $values { "addrspec" "an address specifier" }
}
{ $description "Constructs a proxy to a process running on another node. It can be used to send messages to the process it is acting as a proxy for." }
{ $see-also spawn send } ;
HELP: localnode
{ $values { "node" "a node object" }
}
{ $description "Return the node the process is currently running on." } ;
{ $description "Return the node the current thread is running on." } ;

View File

@ -6,8 +6,10 @@ namespaces kernel ;
QUALIFIED: io.sockets
IN: concurrency.distributed
SYMBOL: local-node ( -- addrspec )
: handle-node-client ( -- )
deserialize first2 thread send ;
deserialize first2 get-process send ;
: (start-node) ( addrspecs addrspec -- )
[
@ -16,18 +18,19 @@ IN: concurrency.distributed
[ handle-node-client ] with-server
] 2curry f spawn drop ;
SYMBOL: local-node ( -- addrspec )
: start-node ( port -- )
dup internet-server host-name rot <inet> (start-node) ;
dup internet-server io.sockets:host-name
rot io.sockets:<inet> (start-node) ;
TUPLE: remote-thread pid node ;
TUPLE: remote-process id node ;
M: remote-thread send ( message thread -- )
{ remote-thread-pid remote-thread-node } get-slots
C: <remote-process> remote-process
M: remote-process send ( message thread -- )
{ remote-process-id remote-process-node } get-slots
io.sockets:<client> [ 2array serialize ] with-stream ;
M: thread (serialize) ( obj -- )
thread-id local-node get-global
remote-thread construct-boa
<remote-process>
(serialize) ;

View File

@ -1,25 +1,17 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.promises concurrency.messaging kernel arrays
continuations ;
IN: concurrency.futures
: future ( quot -- future )
<promise> [
[
>r
[ t 2array ] compose
[ <linked> f 2array ] recover
r> fulfill
] 2curry "Future" spawn drop
[ [ >r call r> fulfill ] 2curry "Future" ] keep
promise-mailbox spawn-linked-to drop
] keep ; inline
: ?future-timeout ( future timeout -- value )
?promise-timeout first2 [ rethrow ] unless ;
?promise-timeout ;
: ?future ( future -- value )
f ?future-timeout ;
: parallel-map ( seq quot -- newseq )
[ curry future ] curry map [ ?future ] map ;
: parallel-each ( seq quot -- )
[ f ] compose parallel-map drop ;
?promise ;

View File

@ -1,6 +1,7 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists kernel threads continuations math ;
USING: dlists kernel threads continuations math
concurrency.conditions ;
IN: concurrency.locks
! Simple critical sections
@ -8,31 +9,26 @@ TUPLE: lock threads owner ;
: <lock> <dlist> lock construct-boa ;
: notify-1 ( dlist -- )
dup dlist-empty? [ pop-back resume ] [ drop ] if ;
<PRIVATE
: wait-for-lock ( lock -- )
[ swap lock-threads push-front ] suspend drop ;
: acquire-lock ( lock -- )
dup lock-owner [ wait-for-lock ] when
: acquire-lock ( lock timeout -- )
over lock-owner
[ 2dup >r lock-threads r> wait ] when drop
self swap set-lock-owner ;
: release-lock ( lock -- )
f over set-lock-owner
lock-threads notify-1 ;
: do-lock ( lock quot acquire release -- )
>r >r over r> call over r> curry [ ] cleanup ; inline
: do-lock ( lock timeout quot acquire release -- )
>r >r pick r> call over r> curry [ ] cleanup ; inline
PRIVATE>
: with-lock ( lock quot -- )
: with-lock ( lock timeout quot -- )
[ acquire-lock ] [ release-lock ] do-lock ; inline
: with-reentrant-lock ( lock quot -- )
: with-reentrant-lock ( lock timeout quot -- )
over lock-owner self eq?
[ nip call ] [ with-lock ] if ; inline
@ -44,44 +40,39 @@ TUPLE: rw-lock readers writers reader# writer ;
<PRIVATE
: wait-for-read-lock ( lock -- )
[ swap lock-readers push-front ] suspend drop ;
: acquire-read-lock ( lock -- )
dup rw-lock-writer [ dup wait-for-read-lock ] when
: acquire-read-lock ( timeout lock -- )
dup rw-lock-writer
[ 2dup >r rw-lock-readers r> wait ] when drop
dup rw-lock-reader# 1+ swap set-rw-lock-reader# ;
: notify-writer ( lock -- )
lock-writers notify-1 ;
rw-lock-writers notify-1 ;
: release-read-lock ( lock -- )
dup rw-lock-reader# 1- dup pick set-rw-lock-reader#
zero? [ notify-writers ] [ drop ] if ;
: wait-for-write-lock ( lock -- )
[ swap lock-writers push-front ] suspend drop ;
zero? [ notify-writer ] [ drop ] if ;
: acquire-write-lock ( lock -- )
dup rw-lock-writer over rw-lock-reader# 0 > or
[ dup wait-for-write-lock ] when
self over set-rw-lock-writer ;
[ 2dup >r rw-lock-writers r> wait ] when drop
self swap set-rw-lock-writer ;
: release-write-lock ( lock -- )
f over set-rw-lock-writer
dup rw-lock-readers dlist-empty?
[ notify-writer ] [ rw-lock-readers notify-all ] if ;
: do-recursive-rw-lock ( lock quot quot' -- )
>r over rw-lock-writer self eq? [ nip call ] r> if ; inline
: do-recursive-rw-lock ( lock timeout quot quot' -- )
>r pick rw-lock-writer self eq? [ 2nip call ] r> if ; inline
PRIVATE>
: with-read-lock ( lock quot -- )
: with-read-lock ( lock timeout quot -- )
[
[ acquire-read-lock ] [ release-read-lock ] do-lock
] do-recursive-rw-lock ; inline
: with-write-lock ( lock quot -- )
: with-write-lock ( lock timeout quot -- )
[
[ acquire-write-lock ] [ release-write-lock ] do-lock
] do-recursive-rw-lock ; inline

View File

@ -6,7 +6,7 @@
IN: concurrency.messaging
USING: dlists threads sequences continuations
namespaces random math quotations words kernel arrays assocs
init system ;
init system concurrency.conditions ;
TUPLE: mailbox threads data ;
@ -16,29 +16,22 @@ TUPLE: mailbox threads data ;
: mailbox-empty? ( mailbox -- bool )
mailbox-data dlist-empty? ;
: notify-all ( dlist -- )
[ second resume ] dlist-slurp yield ;
: mailbox-put ( obj mailbox -- )
[ mailbox-data push-front ] keep
mailbox-threads notify-all ;
<PRIVATE
: mailbox-wait ( mailbox timeout -- mailbox timeout )
[ 2array swap mailbox-threads push-front ] suspend drop ;
inline
: block-unless-pred ( pred mailbox timeout -- )
2over mailbox-data dlist-contains? [
3drop
] [
mailbox-wait block-unless-pred
2dup mailbox-threads wait block-unless-pred
] if ; inline
: block-if-empty ( mailbox timeout -- mailbox )
over mailbox-empty? [
mailbox-wait block-if-empty
2dup mailbox-threads wait block-if-empty
] [
drop
] if ;
@ -104,8 +97,12 @@ M: thread send ( message thread -- )
: rethrow-linked ( error supervisor -- )
>r <linked> r> send ;
: spawn-linked-to ( quot name mailbox -- thread )
[ >r <linked> r> mailbox-put ] curry <thread>
[ (spawn) ] keep ;
: spawn-linked ( quot name -- thread )
self [ rethrow-linked ] curry <thread> [ (spawn) ] keep ;
mailbox spawn-linked-to ;
TUPLE: synchronous data sender tag ;
@ -124,3 +121,21 @@ TUPLE: reply data tag ;
: reply-synchronous ( message synchronous -- )
[ <reply> ] keep synchronous-sender send ;
<PRIVATE
: remote-processes ( -- hash )
\ remote-processes get-global ;
PRIVATE>
: register-process ( name process -- )
swap remote-processes set-at ;
: unregister-process ( name -- )
remote-processes delete-at ;
: get-process ( name -- process )
dup remote-processes at [ ] [ thread ] ?if ;
\ remote-processes global [ H{ } assoc-like ] change-at

View File

@ -1,5 +1,7 @@
! Copyright (C) 2005, 2008 Chris Double, Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: concurrency.messaging concurrency.messaging.private
kernel ;
IN: concurrency.promises
TUPLE: promise mailbox ;
@ -18,7 +20,8 @@ TUPLE: promise mailbox ;
] if ;
: ?promise-timeout ( promise timeout -- result )
>r promise-mailbox r> block-if-empty mailbox-peek ;
>r promise-mailbox r> block-if-empty
mailbox-peek ?linked ;
: ?promise ( promise -- result )
f ?promise-timeout ;

View File

@ -1,3 +1,6 @@
! Copyright (C) 2008 Slava Pestov.
! See http://factorcode.org/license.txt for BSD license.
USING: dlists kernel threads math ;
IN: concurrency.semaphores
TUPLE: semaphore count threads ;

View File

@ -3,7 +3,7 @@
USING: io io.sockets io.files logging continuations kernel
math math.parser namespaces parser sequences strings
prettyprint debugger quotations calendar
threads concurrency.futures ;
threads concurrency.combinators ;
IN: io.server

View File

@ -1,6 +1,6 @@
USING: sequences rss arrays concurrency.futures kernel sorting
html.elements io assocs namespaces math threads
vocabs html furnace http.server.templating calendar math.parser
USING: sequences rss arrays concurrency.combinators kernel
sorting html.elements io assocs namespaces math threads vocabs
html furnace http.server.templating calendar math.parser
splitting continuations debugger system http.server.responders
xml.writer prettyprint logging ;
IN: webapps.planet