New alarm system

db4
Slava Pestov 2008-02-21 19:12:37 -06:00
parent 748c2b4b33
commit 4cb14acff4
6 changed files with 112 additions and 91 deletions

View File

@ -18,7 +18,7 @@ $nl
$nl
"Queries:"
{ $subsection heap-empty? }
{ $subsection heap-length }
{ $subsection heap-size }
{ $subsection heap-peek }
"Insertion:"
{ $subsection heap-push }
@ -40,43 +40,43 @@ HELP: <max-heap>
{ $description "Create a new " { $link max-heap } "." } ;
HELP: heap-push
{ $values { "key" "a comparable object" } { "value" object } { "heap" heap } }
{ $values { "key" "a comparable object" } { "value" object } { "heap" "a heap" } }
{ $description "Push a pair onto a heap. The key must be comparable with all other keys by the " { $link <=> } " generic word." }
{ $side-effects "heap" } ;
HELP: heap-push*
{ $values { "key" "a comparable object" } { "value" object } { "heap" heap } { "entry" entry } }
{ $values { "key" "a comparable object" } { "value" object } { "heap" "a heap" } { "entry" entry } }
{ $description "Push a pair onto a heap, and output an entry which may later be passed to " { $link heap-delete } "." }
{ $side-effects "heap" } ;
HELP: heap-push-all
{ $values { "assoc" assoc } { "heap" heap } }
{ $values { "assoc" assoc } { "heap" "a heap" } }
{ $description "Push every key/value pair of an assoc onto a heap." }
{ $side-effects "heap" } ;
HELP: heap-peek
{ $values { "heap" heap } { "key" object } { "value" object } }
{ $values { "heap" "a heap" } { "key" object } { "value" object } }
{ $description "Output the first element in the heap, leaving it in the heap." } ;
HELP: heap-pop*
{ $values { "heap" heap } }
{ $values { "heap" "a heap" } }
{ $description "Remove the first element from the heap." }
{ $side-effects "heap" } ;
HELP: heap-pop
{ $values { "heap" heap } { "key" object } { "value" object } }
{ $values { "heap" "a heap" } { "key" object } { "value" object } }
{ $description "Output and remove the first element in the heap." }
{ $side-effects "heap" } ;
HELP: heap-empty?
{ $values { "heap" heap } { "?" "a boolean" } }
{ $description "Tests if a " { $link heap } " has no nodes." } ;
{ $values { "heap" "a heap" } { "?" "a boolean" } }
{ $description "Tests if a heap has no nodes." } ;
HELP: heap-size
{ $values { "heap" heap } { "n" integer } }
{ $values { "heap" "a heap" } { "n" integer } }
{ $description "Returns the number of key/value pairs in the heap." } ;
HELP: heap-delete
{ $values { "heap" heap } { "key" object } { "value" object } }
{ $values { "heap" "a heap" } { "key" object } { "value" object } }
{ $description "Output and remove the first element in the heap." }
{ $side-effects "heap" } ;

View File

@ -11,7 +11,7 @@ GENERIC: heap-push* ( value key heap -- entry )
GENERIC: heap-peek ( heap -- value key )
GENERIC: heap-pop* ( heap -- )
GENERIC: heap-pop ( heap -- value key )
GENERIC: heap-delete ( entry -- )
GENERIC: heap-delete ( entry heap -- )
GENERIC: heap-empty? ( heap -- ? )
GENERIC: heap-size ( heap -- n )

View File

@ -17,7 +17,11 @@ ARTICLE: "threads-start/stop" "Starting and stopping threads"
ARTICLE: "threads-yield" "Yielding and suspending threads"
"Yielding to other threads:"
{ $subsection yield }
"Sleeping for a period of time:"
{ $subsection sleep }
"Interruptible sleep:"
{ $subsection nap }
{ $subsection interrupt }
"Threads can be suspended and woken up at some point in the future when a condition is satisfied:"
{ $subsection suspend }
{ $subsection resume }
@ -104,7 +108,16 @@ HELP: yield
HELP: sleep
{ $values { "ms" "a non-negative integer" } }
{ $description "Suspends the current thread for " { $snippet "ms" } " milliseconds. It will not get woken up before this time period elapses, but since the multitasker is co-operative, the precise wakeup time is dependent on when other threads yield." } ;
{ $description "Suspends the current thread for " { $snippet "ms" } " milliseconds." }
{ $errors "Throws an error if another thread interrupted the sleep with " { $link interrupt } "." } ;
HELP: nap
{ $values { "ms/f" "a non-negative integer or " { $link f } } { "?" "a boolean indicating whether the thread was interrupted" } }
{ $description "Suspends the current thread until another thread interrupts it with " { $link interrupt } ". If the input parameter is not " { $link f } ", then the thread will also wake up if the timeout expires before an interrupt is received." } ;
HELP: interrupt
{ $values { "thread" thread } }
{ $description "Interrupts a sleeping thread." } ;
HELP: suspend
{ $values { "quot" "a quotation with stack effect " { $snippet "( thread -- )" } } { "obj" object } }

View File

@ -13,7 +13,7 @@ TUPLE: thread
name quot error-handler exit-handler
id
continuation state
mailbox variables ;
mailbox variables sleep-entry ;
: self ( -- thread ) 40 getenv ; inline
@ -86,19 +86,25 @@ PRIVATE>
<PRIVATE
: schedule-sleep ( thread ms -- )
>r check-registered r> sleep-queue heap-push ;
>r check-registered dup r> sleep-queue heap-push*
swap set-thread-sleep-entry ;
: wake-up? ( heap -- ? )
: expire-sleep? ( heap -- ? )
dup heap-empty?
[ drop f ] [ heap-peek nip millis <= ] if ;
: wake-up ( -- )
: expire-sleep ( thread -- )
f over set-thread-sleep-entry resume ;
: expire-sleep-loop ( -- )
sleep-queue
[ dup wake-up? ] [ dup heap-pop drop resume ] [ ] while
[ dup expire-sleep? ]
[ dup heap-pop drop expire-sleep ]
[ ] while
drop ;
: next ( -- )
wake-up
expire-sleep-loop
run-queue pop-back
dup array? [ first2 ] [ f swap ] if dup set-self
f over set-thread-state
@ -127,14 +133,23 @@ PRIVATE>
: yield ( -- ) [ resume ] "yield" suspend drop ;
: nap ( ms/f -- ? )
[
>fixnum millis + [ schedule-sleep ] curry "sleep"
] [
[ drop ] "interrupt"
] if* suspend ;
: sleep ( ms -- )
>fixnum millis +
[ schedule-sleep ] curry
"sleep" suspend drop ;
nap [ "Sleep interrupted" throw ] when ;
: interrupt ( thread -- )
dup thread-sleep-entry [ sleep-queue heap-delete ] when*
t swap resume-with ;
: (spawn) ( thread -- )
[
resume [
resume-now [
dup set-self
dup register-thread
init-namespaces

View File

@ -1,87 +1,80 @@
! Copyright (C) 2007 Doug Coleman.
! Copyright (C) 2005, 2008 Slava Pestov, Doug Coleman.
! See http://factorcode.org/license.txt for BSD license.
USING: arrays calendar combinators concurrency.messaging
threads generic init kernel math namespaces sequences ;
USING: arrays calendar combinators generic init kernel math
namespaces sequences heaps boxes threads debugger quotations ;
IN: alarms
TUPLE: alarm time quot ;
TUPLE: alarm time interval quot entry ;
C: <alarm> alarm
: check-alarm
pick timestamp? [ "Not a timestamp" throw ] unless
over dup dt? swap not or [ "Not a dt" throw ] unless
dup callable? [ "Not a quotation" throw ] unless ; inline
<PRIVATE
: <alarm> ( time delay quot -- alarm )
check-alarm <box> alarm construct-boa ;
! for now a V{ }, eventually a min-heap to store alarms
! Global min-heap
SYMBOL: alarms
SYMBOL: alarm-receiver
SYMBOL: alarm-looper
SYMBOL: alarm-thread
: add-alarm ( alarm -- )
alarms get-global push ;
: notify-alarm-thread ( -- )
alarm-thread get-global interrupt ;
: remove-alarm ( alarm -- )
alarms get-global delete ;
: add-alarm ( time delay quot -- alarm )
<alarm> [
dup dup alarm-time alarms get-global heap-push*
swap alarm-entry >box
notify-alarm-thread
] keep ;
: handle-alarm ( alarm -- )
dup delegate {
{ "register" [ add-alarm ] }
{ "unregister" [ remove-alarm ] }
} case ;
: cancel-alarm ( alarm -- )
alarm-entry box> alarms get-global heap-delete ;
: expired-alarms ( -- seq )
now alarms get-global
[ alarm-time <=> 0 > ] with subset ;
: alarm-expired? ( alarm now -- ? )
>r alarm-time r> <=> 0 <= ;
: unexpired-alarms ( -- seq )
now alarms get-global
[ alarm-time <=> 0 <= ] with subset ;
: reschedule-alarm ( alarm -- )
dup alarm-time over alarm-interval +dt
over set-alarm-time
add-alarm drop ;
: call-alarm ( alarm -- )
alarm-quot "Alarm invocation" spawn drop ;
dup alarm-quot try
dup alarm-entry box> drop
dup alarm-interval [ reschedule-alarm ] [ drop ] if ;
: do-alarms ( -- )
expired-alarms [ call-alarm ] each
unexpired-alarms alarms set-global ;
: (trigger-alarms) ( alarms now -- )
over heap-empty? [
2drop
] [
over heap-peek drop over alarm-expired? [
over heap-pop drop call-alarm
(trigger-alarms)
] [
2drop
] if
] if ;
: alarm-receive-loop ( -- )
receive dup alarm? [ handle-alarm ] [ drop ] if
alarm-receive-loop ;
: trigger-alarms ( alarms -- )
now (trigger-alarms) ;
: start-alarm-receiver ( -- )
[
alarm-receive-loop
] "Alarm receiver" spawn alarm-receiver set-global ;
: next-alarm ( alarms -- ms )
dup heap-empty?
[ drop f ] [
heap-peek drop alarm-time now
[ timestamp>unix-time ] 2apply [-] 1000 *
] if ;
: alarm-loop ( -- )
alarms get-global empty? [
do-alarms
] unless 100 sleep alarm-loop ;
: alarm-thread-loop ( -- )
alarms get-global
dup next-alarm nap drop
dup trigger-alarms
alarm-thread-loop ;
: start-alarm-looper ( -- )
[
alarm-loop
] "Alarm looper" spawn alarm-looper set-global ;
: init-alarms ( -- )
<min-heap> alarms set-global
[ alarm-thread-loop ] "Alarms" spawn
alarm-thread set-global ;
: send-alarm ( str alarm -- )
over set-delegate
alarm-receiver get-global send ;
: start-alarm-daemon ( -- )
alarms get-global [ V{ } clone alarms set-global ] unless
start-alarm-looper
start-alarm-receiver ;
[ start-alarm-daemon ] "alarms" add-init-hook
PRIVATE>
: register-alarm ( alarm -- )
"register" send-alarm ;
: unregister-alarm ( alarm -- )
"unregister" send-alarm ;
: change-alarm ( alarm-old alarm-new -- )
"register" send-alarm
"unregister" send-alarm ;
! Example:
! 5 seconds from-now [ "hi" print flush ] <alarm> register-alarm
[ init-alarms ] "alarms" add-init-hook

View File

@ -223,7 +223,7 @@ M: timestamp <=> ( ts1 ts2 -- n )
[ >time< >r >r 3600 * r> 60 * r> + + ] 2apply - + ;
: unix-1970 ( -- timestamp )
1970 1 1 0 0 0 0 <timestamp> ;
1970 1 1 0 0 0 0 <timestamp> ; foldable
: unix-time>timestamp ( n -- timestamp )
>r unix-1970 r> seconds +dt ;