From 437428714e39c98ed9e004a788bf16cab81096b3 Mon Sep 17 00:00:00 2001 From: Doug Coleman <doug.coleman@gmail.com> Date: Wed, 31 Oct 2007 16:42:26 -0500 Subject: [PATCH] Add timestamps to concurrency but do not actually time threads out yet Optimize ID generation for concurrency --- extra/concurrency/concurrency.factor | 83 ++++++++++++++++++---------- 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/extra/concurrency/concurrency.factor b/extra/concurrency/concurrency.factor index 94bda9e720..b59f758ad8 100644 --- a/extra/concurrency/concurrency.factor +++ b/extra/concurrency/concurrency.factor @@ -5,11 +5,20 @@ ! concurrency. USING: vectors dlists threads sequences continuations namespaces random math quotations words kernel match - arrays io assocs init ; + arrays io assocs init shuffle system ; IN: concurrency TUPLE: mailbox threads data ; +TUPLE: thread timeout continuation continued? ; + +: <thread> ( timeout continuation -- obj ) + >r dup [ millis + ] when r> + { + set-thread-timeout + set-thread-continuation + } thread construct ; + : make-mailbox ( -- mailbox ) V{ } clone <dlist> mailbox construct-boa ; @@ -18,34 +27,44 @@ TUPLE: mailbox threads data ; : mailbox-put ( obj mailbox -- ) [ mailbox-data dlist-push-end ] keep - [ mailbox-threads ] keep 0 <vector> swap set-mailbox-threads - [ schedule-thread ] each yield ; + [ mailbox-threads ] keep + V{ } clone swap set-mailbox-threads + [ thread-continuation schedule-thread ] each yield ; <PRIVATE -: (mailbox-block-unless-pred) ( pred mailbox -- ) - 2dup mailbox-data dlist-contains? [ - 2drop +: (mailbox-block-unless-pred) ( pred mailbox timeout -- ) + 2over mailbox-data dlist-contains? [ + 3drop ] [ - [ swap mailbox-threads push stop ] callcc0 + [ <thread> swap mailbox-threads push stop ] callcc0 (mailbox-block-unless-pred) ] if ; inline -: (mailbox-block-if-empty) ( mailbox -- mailbox2 ) - dup mailbox-empty? [ - [ swap mailbox-threads push stop ] callcc0 +: (mailbox-block-if-empty) ( mailbox timeout -- mailbox2 ) + over mailbox-empty? [ + [ <thread> swap mailbox-threads push stop ] callcc0 + "(mailbox-block-if-empty)" print flush (mailbox-block-if-empty) - ] when ; + ] [ + drop + ] if ; PRIVATE> -: mailbox-get ( mailbox -- obj ) +: mailbox-get* ( mailbox timeout -- obj ) (mailbox-block-if-empty) mailbox-data dlist-pop-front ; -: mailbox-get-all ( mailbox -- array ) +: mailbox-get ( mailbox -- obj ) + f mailbox-get* ; + +: mailbox-get-all* ( mailbox timeout -- array ) (mailbox-block-if-empty) [ dup mailbox-empty? ] [ dup mailbox-data dlist-pop-front ] { } unfold ; +: mailbox-get-all ( mailbox -- array ) + f mailbox-get-all* ; + : while-mailbox-empty ( mailbox quot -- ) over mailbox-empty? [ dup >r swap slip r> while-mailbox-empty @@ -53,10 +72,12 @@ PRIVATE> 2drop ] if ; inline +: mailbox-get?* ( pred mailbox timeout -- obj ) + 2over >r >r (mailbox-block-unless-pred) r> r> + mailbox-data dlist-remove ; inline + : mailbox-get? ( pred mailbox -- obj ) - 2dup (mailbox-block-unless-pred) - mailbox-data dlist-remove ; - inline + f mailbox-get?* ; TUPLE: process links pid mailbox ; @@ -64,9 +85,7 @@ C: <process> process GENERIC: send ( message process -- ) -: random-64 ( -- id ) - #! Generate a random id to use for pids - "ID" 64 [ drop 10 random CHAR: 0 + ] map append ; +: random-pid ( -- id ) 8 big-random ; <PRIVATE : make-process ( -- process ) @@ -74,13 +93,13 @@ GENERIC: send ( message process -- ) #! similar to a thread but can send and receive messages to and #! from other processes. It may also be linked to other processes so #! that it receives a message if that process terminates. - [ ] random-64 make-mailbox <process> ; + [ ] random-pid make-mailbox <process> ; : make-linked-process ( process -- process ) #! Return a process set to run on the local node. That process is #! linked to the process on the stack. It will receive a message if #! that process terminates. - 1quotation random-64 make-mailbox <process> ; + 1quotation random-pid make-mailbox <process> ; PRIVATE> : self ( -- process ) @@ -187,7 +206,7 @@ MATCH-VARS: ?from ?tag ; <PRIVATE : tag-message ( message -- tagged-message ) #! Given a message, wrap it with the sending process and a unique tag. - >r self random-64 r> 3array ; + >r self random-pid r> 3array ; PRIVATE> : send-synchronous ( message process -- reply ) @@ -286,23 +305,29 @@ TUPLE: promise fulfilled? value processes ; [ set-promise-value ] keep [ t swap set-promise-fulfilled? ] keep [ promise-processes ] keep - 0 <vector> swap set-promise-processes - [ schedule-thread ] each yield + V{ } clone swap set-promise-processes + [ thread-continuation schedule-thread ] each yield ] if ; <PRIVATE - : (maybe-block-promise) ( promise -- promise ) + : (maybe-block-promise) ( promise timeout -- promise ) #! Block the process if the promise is unfulfilled. This is different from #! (mailbox-block-if-empty) in that when a promise is fulfilled, all threads #! need to be resumed, rather than just one. - dup promise-fulfilled? [ - [ swap promise-processes push stop ] callcc0 - ] unless ; + over promise-fulfilled? [ + drop + ] [ + [ <thread> swap promise-processes push stop ] callcc0 + drop + ] if ; PRIVATE> -: ?promise ( promise -- result ) +: ?promise* ( promise timeout -- result ) (maybe-block-promise) promise-value ; +: ?promise ( promise -- result ) + f ?promise* ; + ! ****************************** ! Experimental code below ! ******************************