Add timestamps to concurrency but do not actually time threads out yet

Optimize ID generation for concurrency
release
Doug Coleman 2007-10-31 16:42:26 -05:00
parent 3c28446ac3
commit 437428714e
1 changed files with 54 additions and 29 deletions

View File

@ -5,11 +5,20 @@
! concurrency. ! concurrency.
USING: vectors dlists threads sequences continuations USING: vectors dlists threads sequences continuations
namespaces random math quotations words kernel match namespaces random math quotations words kernel match
arrays io assocs init ; arrays io assocs init shuffle system ;
IN: concurrency IN: concurrency
TUPLE: mailbox threads data ; TUPLE: mailbox threads data ;
TUPLE: thread timeout continuation continued? ;
: <thread> ( timeout continuation -- obj )
>r dup [ millis + ] when r>
{
set-thread-timeout
set-thread-continuation
} thread construct ;
: make-mailbox ( -- mailbox ) : make-mailbox ( -- mailbox )
V{ } clone <dlist> mailbox construct-boa ; V{ } clone <dlist> mailbox construct-boa ;
@ -18,34 +27,44 @@ TUPLE: mailbox threads data ;
: mailbox-put ( obj mailbox -- ) : mailbox-put ( obj mailbox -- )
[ mailbox-data dlist-push-end ] keep [ mailbox-data dlist-push-end ] keep
[ mailbox-threads ] keep 0 <vector> swap set-mailbox-threads [ mailbox-threads ] keep
[ schedule-thread ] each yield ; V{ } clone swap set-mailbox-threads
[ thread-continuation schedule-thread ] each yield ;
<PRIVATE <PRIVATE
: (mailbox-block-unless-pred) ( pred mailbox -- ) : (mailbox-block-unless-pred) ( pred mailbox timeout -- )
2dup mailbox-data dlist-contains? [ 2over mailbox-data dlist-contains? [
2drop 3drop
] [ ] [
[ swap mailbox-threads push stop ] callcc0 [ <thread> swap mailbox-threads push stop ] callcc0
(mailbox-block-unless-pred) (mailbox-block-unless-pred)
] if ; inline ] if ; inline
: (mailbox-block-if-empty) ( mailbox -- mailbox2 ) : (mailbox-block-if-empty) ( mailbox timeout -- mailbox2 )
dup mailbox-empty? [ over mailbox-empty? [
[ swap mailbox-threads push stop ] callcc0 [ <thread> swap mailbox-threads push stop ] callcc0
"(mailbox-block-if-empty)" print flush
(mailbox-block-if-empty) (mailbox-block-if-empty)
] when ; ] [
drop
] if ;
PRIVATE> PRIVATE>
: mailbox-get ( mailbox -- obj ) : mailbox-get* ( mailbox timeout -- obj )
(mailbox-block-if-empty) (mailbox-block-if-empty)
mailbox-data dlist-pop-front ; mailbox-data dlist-pop-front ;
: mailbox-get-all ( mailbox -- array ) : mailbox-get ( mailbox -- obj )
f mailbox-get* ;
: mailbox-get-all* ( mailbox timeout -- array )
(mailbox-block-if-empty) (mailbox-block-if-empty)
[ dup mailbox-empty? ] [ dup mailbox-empty? ]
[ dup mailbox-data dlist-pop-front ] [ dup mailbox-data dlist-pop-front ]
{ } unfold ; { } unfold ;
: mailbox-get-all ( mailbox -- array )
f mailbox-get-all* ;
: while-mailbox-empty ( mailbox quot -- ) : while-mailbox-empty ( mailbox quot -- )
over mailbox-empty? [ over mailbox-empty? [
dup >r swap slip r> while-mailbox-empty dup >r swap slip r> while-mailbox-empty
@ -53,10 +72,12 @@ PRIVATE>
2drop 2drop
] if ; inline ] if ; inline
: mailbox-get?* ( pred mailbox timeout -- obj )
2over >r >r (mailbox-block-unless-pred) r> r>
mailbox-data dlist-remove ; inline
: mailbox-get? ( pred mailbox -- obj ) : mailbox-get? ( pred mailbox -- obj )
2dup (mailbox-block-unless-pred) f mailbox-get?* ;
mailbox-data dlist-remove ;
inline
TUPLE: process links pid mailbox ; TUPLE: process links pid mailbox ;
@ -64,9 +85,7 @@ C: <process> process
GENERIC: send ( message process -- ) GENERIC: send ( message process -- )
: random-64 ( -- id ) : random-pid ( -- id ) 8 big-random ;
#! Generate a random id to use for pids
"ID" 64 [ drop 10 random CHAR: 0 + ] map append ;
<PRIVATE <PRIVATE
: make-process ( -- process ) : make-process ( -- process )
@ -74,13 +93,13 @@ GENERIC: send ( message process -- )
#! similar to a thread but can send and receive messages to and #! similar to a thread but can send and receive messages to and
#! from other processes. It may also be linked to other processes so #! from other processes. It may also be linked to other processes so
#! that it receives a message if that process terminates. #! that it receives a message if that process terminates.
[ ] random-64 make-mailbox <process> ; [ ] random-pid make-mailbox <process> ;
: make-linked-process ( process -- process ) : make-linked-process ( process -- process )
#! Return a process set to run on the local node. That process is #! Return a process set to run on the local node. That process is
#! linked to the process on the stack. It will receive a message if #! linked to the process on the stack. It will receive a message if
#! that process terminates. #! that process terminates.
1quotation random-64 make-mailbox <process> ; 1quotation random-pid make-mailbox <process> ;
PRIVATE> PRIVATE>
: self ( -- process ) : self ( -- process )
@ -187,7 +206,7 @@ MATCH-VARS: ?from ?tag ;
<PRIVATE <PRIVATE
: tag-message ( message -- tagged-message ) : tag-message ( message -- tagged-message )
#! Given a message, wrap it with the sending process and a unique tag. #! Given a message, wrap it with the sending process and a unique tag.
>r self random-64 r> 3array ; >r self random-pid r> 3array ;
PRIVATE> PRIVATE>
: send-synchronous ( message process -- reply ) : send-synchronous ( message process -- reply )
@ -286,23 +305,29 @@ TUPLE: promise fulfilled? value processes ;
[ set-promise-value ] keep [ set-promise-value ] keep
[ t swap set-promise-fulfilled? ] keep [ t swap set-promise-fulfilled? ] keep
[ promise-processes ] keep [ promise-processes ] keep
0 <vector> swap set-promise-processes V{ } clone swap set-promise-processes
[ schedule-thread ] each yield [ thread-continuation schedule-thread ] each yield
] if ; ] if ;
<PRIVATE <PRIVATE
: (maybe-block-promise) ( promise -- promise ) : (maybe-block-promise) ( promise timeout -- promise )
#! Block the process if the promise is unfulfilled. This is different from #! Block the process if the promise is unfulfilled. This is different from
#! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads #! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads
#! need to be resumed, rather than just one. #! need to be resumed, rather than just one.
dup promise-fulfilled? [ over promise-fulfilled? [
[ swap promise-processes push stop ] callcc0 drop
] unless ; ] [
[ <thread> swap promise-processes push stop ] callcc0
drop
] if ;
PRIVATE> PRIVATE>
: ?promise ( promise -- result ) : ?promise* ( promise timeout -- result )
(maybe-block-promise) promise-value ; (maybe-block-promise) promise-value ;
: ?promise ( promise -- result )
f ?promise* ;
! ****************************** ! ******************************
! Experimental code below ! Experimental code below
! ****************************** ! ******************************