Yeah, for me too – the ~15 seconds I mentioned was time to start + time to clean up at the end when all the tasks wake up and exit.
Yeah, that’s the “tricky bit” I mentioned :-). Waking up every n seconds to check the queue would work. It’s also possible to avoid it, and implementing that is an interesting little puzzle :-).
Here’s one way I thought of – when a new event is scheduled, we send it to the scheduler task through a memory channel, and the scheduler task alternates between running events whose time has come, and waiting for new jobs to be sent to it on the channel – and (this is the tricky part) whenever it waits for a new job to be sent, it sets a timeout based on when the next event should happen. Complete code below; for the core algorithm see the “Main loop” comment:
import trio
import attr
import heapq
import math
from functools import partial
# Start up a scheduled task runner:
# scheduler = await nursery.start(scheduled_task_runner)
#
# Schedule jobs to execute:
# await scheduler.schedule_at(trio_time, async_fn)
# await scheduler.schedule_after(seconds, async_fn)
#
# Note: as currently written, the scheduler doesn't take any special measures
# to handle exceptions. If your job raises an exception, then it will
# propagate out of the scheduler and cancel running and scheduled jobs. If
# that's not what you want, then either make sure to catch exceptions inside
# your submitted jobs, or else modify the scheduler to wrap jobs in some
# kind of catch-all handler before starting them.
# Little immutable object. Sorts by scheduled time so suitable for putting in
# a heap. Uses slots to save memory.
@attr.s(frozen=True, slots=True)
class Job:
scheduled_time: float = attr.ib() # trio time
async_fn = attr.ib(cmp=False)
# The object used to submit jobs to the scheduler. A thin abstraction wrapper
# around a memory channel.
@attr.s(frozen=True, cmp=False)
class Scheduler:
_send_channel: trio.abc.SendChannel[Job] = attr.ib()
async def schedule_at(self, scheduled_time, async_fn, *args):
"""Schedule 'async_fn(*args)' to run when the Trio clock reads
'scheduled_time'.
"""
await self._send_channel.send(
Job(scheduled_time, partial(async_fn, *args))
)
async def schedule_after(self, seconds, async_fn, *args):
"""Schedule 'async_fn(*args)' to run after 'seconds' seconds
have passed.
"""
scheduled_time = trio.current_time() + seconds
await self.schedule_at(scheduled_time, async_fn, *args)
async def scheduled_task_runner(*, task_status):
heap = []
async with trio.open_nursery() as nursery:
s, r = trio.open_memory_channel(0)
scheduler = Scheduler(s)
task_status.started(scheduler)
async with s, r:
# Main loop:
while True:
# Process everything in the heap that's already expired
now = trio.current_time()
while heap and heap[0].scheduled_time <= now:
job = heapq.heappop(heap)
nursery.start_soon(job.async_fn)
# Wait until either a new job is submitted, or else the next
# deadline expires.
if heap:
next_deadline = heap[0].scheduled_time
else:
next_deadline = math.inf
with trio.move_on_at(next_deadline):
new_job = await r.receive()
heapq.heappush(heap, new_job)
# Loop around to process any expired jobs, and recalculate the
# next_deadline.
async def example():
from datetime import datetime
def print_with_time(msg):
print("{}: {}".format(datetime.now().isoformat(), msg))
async def job1():
print_with_time("Job 1 running!")
async def job2():
print_with_time("Job 2 running!")
async def shutdown(cancel_scope):
print_with_time("Shutting down")
cancel_scope.cancel()
async with trio.open_nursery() as nursery:
scheduler = await nursery.start(scheduled_task_runner)
print_with_time("Scheduling job1 to run in 5 seconds")
await scheduler.schedule_after(5, job1)
print_with_time("Scheduling job2 to run in 10 seconds")
await scheduler.schedule_after(10, job2)
print_with_time("Scheduling shutdown to run in 11 seconds")
await scheduler.schedule_after(11, shutdown, nursery.cancel_scope)
if __name__ == "__main__":
trio.run(example)
Sample output:
2019-03-05T19:05:40.079580: Scheduling job1 to run in 5 seconds
2019-03-05T19:05:40.079638: Scheduling job2 to run in 10 seconds
2019-03-05T19:05:40.079670: Scheduling shutdown to run in 11 seconds
2019-03-05T19:05:45.085195: Job 1 running!
2019-03-05T19:05:50.086190: Job 2 running!
2019-03-05T19:05:51.082547: Shutting down
For me, scheduling 1,000,000 jobs like this takes ~400-500 megabytes of memory, so it’s about 10x more memory-efficient than having all those jobs calling sleep
individually. I guess it would be even smaller if we skipped calling partial
unless it was actually needed…
There’s also another way to do it – when a new job arrives, then instead of using a memory channel to tell the scheduler about it, we could instead add it directly to the queue, and then update the scheduler’s next wakeup time. This is even trickier, though the final code is arguably simpler
@attr.s
class Scheduler:
_cancel_scope = attr.ib(factory=trio.CancelScope)
_heap = attr.ib(factory=list)
def schedule_at(self, scheduled_time, async_fn, *args):
job = Job(scheduled_time, partial(async_fn, *args))
heapq.heappush(self._heap, job)
# If the new job is scheduled to happen before the scheduler's next
# wakeup time, then update the wakeup time
if scheduled_time < self._cancel_scope.deadline:
self._cancel_scope.deadline = scheduled_time
def schedule_after(self, seconds, async_fn, *args):
scheduled_time = trio.current_time() + seconds
self.schedule_at(scheduled_time, async_fn, *args)
async def scheduled_task_runner(*, task_status):
scheduler = Scheduler()
task_status.started(scheduler)
async with trio.open_nursery() as nursery:
# Main loop:
while True:
# Process everything in the heap that's already expired
now = trio.current_time()
while scheduler._heap and scheduler._heap[0].scheduled_time <= now:
job = heapq.heappop(scheduler._heap)
nursery.start_soon(job.async_fn)
# Sleep until the next deadline. If the next deadline changes due
# to new jobs being scheduled, then schedule_at will update our
# deadline to make sure we still wake up at the right time.
if scheduler._heap:
next_deadline = scheduler._heap[0].scheduled_time
else:
next_deadline = math.inf
scheduler._cancel_scope = trio.CancelScope(deadline=next_deadline)
with scheduler._cancel_scope:
await trio.sleep(math.inf)
This is used exactly the same way as the previous example, except that now schedule_at
and schedule_after
are sync methods instead of async methods.