diff --git a/extra/concurrency/concurrency.factor b/extra/concurrency/concurrency.factor index 94bda9e720..b59f758ad8 100644 --- a/extra/concurrency/concurrency.factor +++ b/extra/concurrency/concurrency.factor @@ -5,11 +5,20 @@ ! concurrency. USING: vectors dlists threads sequences continuations namespaces random math quotations words kernel match - arrays io assocs init ; + arrays io assocs init shuffle system ; IN: concurrency TUPLE: mailbox threads data ; +TUPLE: thread timeout continuation continued? ; + +: ( timeout continuation -- obj ) + >r dup [ millis + ] when r> + { + set-thread-timeout + set-thread-continuation + } thread construct ; + : make-mailbox ( -- mailbox ) V{ } clone mailbox construct-boa ; @@ -18,34 +27,44 @@ TUPLE: mailbox threads data ; : mailbox-put ( obj mailbox -- ) [ mailbox-data dlist-push-end ] keep - [ mailbox-threads ] keep 0 swap set-mailbox-threads - [ schedule-thread ] each yield ; + [ mailbox-threads ] keep + V{ } clone swap set-mailbox-threads + [ thread-continuation schedule-thread ] each yield ; swap mailbox-threads push stop ] callcc0 (mailbox-block-unless-pred) ] if ; inline -: (mailbox-block-if-empty) ( mailbox -- mailbox2 ) - dup mailbox-empty? [ - [ swap mailbox-threads push stop ] callcc0 +: (mailbox-block-if-empty) ( mailbox timeout -- mailbox2 ) + over mailbox-empty? [ + [ swap mailbox-threads push stop ] callcc0 + "(mailbox-block-if-empty)" print flush (mailbox-block-if-empty) - ] when ; + ] [ + drop + ] if ; PRIVATE> -: mailbox-get ( mailbox -- obj ) +: mailbox-get* ( mailbox timeout -- obj ) (mailbox-block-if-empty) mailbox-data dlist-pop-front ; -: mailbox-get-all ( mailbox -- array ) +: mailbox-get ( mailbox -- obj ) + f mailbox-get* ; + +: mailbox-get-all* ( mailbox timeout -- array ) (mailbox-block-if-empty) [ dup mailbox-empty? ] [ dup mailbox-data dlist-pop-front ] { } unfold ; +: mailbox-get-all ( mailbox -- array ) + f mailbox-get-all* ; + : while-mailbox-empty ( mailbox quot -- ) over mailbox-empty? [ dup >r swap slip r> while-mailbox-empty @@ -53,10 +72,12 @@ PRIVATE> 2drop ] if ; inline +: mailbox-get?* ( pred mailbox timeout -- obj ) + 2over >r >r (mailbox-block-unless-pred) r> r> + mailbox-data dlist-remove ; inline + : mailbox-get? ( pred mailbox -- obj ) - 2dup (mailbox-block-unless-pred) - mailbox-data dlist-remove ; - inline + f mailbox-get?* ; TUPLE: process links pid mailbox ; @@ -64,9 +85,7 @@ C: process GENERIC: send ( message process -- ) -: random-64 ( -- id ) - #! Generate a random id to use for pids - "ID" 64 [ drop 10 random CHAR: 0 + ] map append ; +: random-pid ( -- id ) 8 big-random ; ; + [ ] random-pid make-mailbox ; : make-linked-process ( process -- process ) #! Return a process set to run on the local node. That process is #! linked to the process on the stack. It will receive a message if #! that process terminates. - 1quotation random-64 make-mailbox ; + 1quotation random-pid make-mailbox ; PRIVATE> : self ( -- process ) @@ -187,7 +206,7 @@ MATCH-VARS: ?from ?tag ; r self random-64 r> 3array ; + >r self random-pid r> 3array ; PRIVATE> : send-synchronous ( message process -- reply ) @@ -286,23 +305,29 @@ TUPLE: promise fulfilled? value processes ; [ set-promise-value ] keep [ t swap set-promise-fulfilled? ] keep [ promise-processes ] keep - 0 swap set-promise-processes - [ schedule-thread ] each yield + V{ } clone swap set-promise-processes + [ thread-continuation schedule-thread ] each yield ] if ; swap promise-processes push stop ] callcc0 + drop + ] if ; PRIVATE> -: ?promise ( promise -- result ) +: ?promise* ( promise timeout -- result ) (maybe-block-promise) promise-value ; +: ?promise ( promise -- result ) + f ?promise* ; + ! ****************************** ! Experimental code below ! ******************************