Trio.sleep for some months

Hello, I’m quite new to trio and concurrent programming in general, so sorry if my question seem stupid.

I’m doing a notification system and find trio very useful to process user’s requests.
Right now to deliver a notification delayed in some time, I just use a trio.sleep(delay) in a loop, and after the timer is done, send the notification.

The question is: if the sleep time will be months, will the solution suffer? How will trio behave if connections are not frequent, but there are thousands of long tasks to do?

It trio isn’t the fit for my problem, what can you advise?

Thanks!

Hello! :wave: Welcome!

Well, you’ll have to make sure your computer has a reliable electrical supply, to keep running for a few months. Trio can’t help you with that part :slight_smile:

Otherwise, I think this should be fine! Each sleeping task costs maybe 5-10 kilobytes of memory. There aren’t any limits on how long you can sleep for. Trio uses nicely scalable data structures for handling timeouts and things… basically everything should just work. Here’s a little stress test you can try – it starts 100,000 tasks, that each sleep for 30 seconds, and then they all exit and the program exits:

import trio

async def main():
    async with trio.open_nursery() as nursery:
        for _ in range(100000):
            nursery.start_soon(trio.sleep, 30)

trio.run(main)

For me it uses a total of ~15 seconds of CPU time for starting and stopping all the tasks (so I guess Trio can start/stop ~7,000 tasks/second), but in between the process goes entirely to sleep, and the peak memory use is ~500 megabytes. If it were a real program with 100,000 tasks then memory usage would of course be higher, since each task would need to store some information about what it’s doing, besides calling sleep :-). But this gives you an idea of the baseline Trio/Python overhead.

If you have, like millions of tasks, and can’t afford a few kilobytes per task… well, Trio can’t really make the tasks any smaller; most of the space is the Python coroutine object holding the task’s stack frames, local variables, etc. But what you could do instead is to track all of your deadlines in a heapq, and then have one task that sleeps until the next deadline, kicks off the scheduled task, repeat. That way for a task scheduled in the future, you just need the memory to hold the task deadline in the heap (maybe a few hundred bytes?), instead of a whole Python stack. There are some tricky bits to making this work right, so if you do decide you need to go down this road, let us know and we can help you figure it out. And this probably won’t be an issue unless you have a lot of tasks.

One more thing: if you’re talking about scheduling events months in advance, then I’m guessing your events are probably specified in calendar / wall clock time (“UTC midnight on April 1”), not in relative time (“2592493 seconds from now”). These are mostly the same, except if someone has to reset the calendar time. For example, support your computer’s built-in clock is messed up, and it boots up thinking that it’s January 1, 1970. Then your program starts, and it calculates that the next event is in 2019, so that’s like, 49 years in the future, and it does await trio.sleep(49 years). And then, someone notices the computer’s clock is off, and fixes it. Now, internally, Trio always uses relative times, so if you tell it to sleep for 49 years, it will try to sleep for 49 years – it won’t recalculate that when someone fixes the clock. If you do want to adjust your wakeup time when someone adjusts the clock… then that’s possible, but it requires some fiddly platform-specific code, and no-one’s implemented it yet (see issue #173). Again, this is something you can probably get away with ignoring. But if you want to worry about it at some point, we can help you figure out how to implement this too.

2 Likes

If you plan to sleep for months based on user requests, you’ll need to be careful with daylight saving time. Say the current time is midnight on March 1 and your program wants to wake up on April 1st. You would naively use this code:

>>> from datetime import datetime
>>> datetime(2019, 4, 1, 0, 0, 0, 0) - datetime(2019, 3, 1, 0, 0, 0, 0)
datetime.timedelta(31)

But if the timezone is Europe/Paris, you don’t want to sleep 31 days, but 30 days and 23 hours instead!

>>> import pytz
>>> tz = pytz.timezone("Europe/Paris")
>>> tz.localize(datetime(2019, 4, 1, 0, 0, 0, 0)) - tz.localize(datetime(2019, 3, 1, 0, 0, 0, 0))
datetime.timedelta(30, 82800)

To avoid those headaches, it’s better to always work with UTC times if possible, which is why @njs mentioned “UTC midnight on April 1” in his example. :slight_smile:

(Maybe this specific issue is not a problem in your case, but I thought I would mention it because it bit me in the past.)

2 Likes

Thanks a lot for a detailed answer!
I’ve run your example, and got a similar results, except that starting took 5 seconds. I’ll write in a simple way, and then if some problems occur will consider changing approach:)
The heqpq solution seems good, but what if while sleeping for next event in queue a new task was added to it by a client request handler task? Probably will need to check the queue every n seconds.

I will get in touch in case of worries, thank you again so much!

Yeah, for me too – the ~15 seconds I mentioned was time to start + time to clean up at the end when all the tasks wake up and exit.

Yeah, that’s the “tricky bit” I mentioned :-). Waking up every n seconds to check the queue would work. It’s also possible to avoid it, and implementing that is an interesting little puzzle :-).

Here’s one way I thought of – when a new event is scheduled, we send it to the scheduler task through a memory channel, and the scheduler task alternates between running events whose time has come, and waiting for new jobs to be sent to it on the channel – and (this is the tricky part) whenever it waits for a new job to be sent, it sets a timeout based on when the next event should happen. Complete code below; for the core algorithm see the “Main loop” comment:

import trio
import attr
import heapq
import math
from functools import partial

# Start up a scheduled task runner:
#   scheduler = await nursery.start(scheduled_task_runner)
#
# Schedule jobs to execute:
#   await scheduler.schedule_at(trio_time, async_fn)
#   await scheduler.schedule_after(seconds, async_fn)
#
# Note: as currently written, the scheduler doesn't take any special measures
#   to handle exceptions. If your job raises an exception, then it will
#   propagate out of the scheduler and cancel running and scheduled jobs. If
#   that's not what you want, then either make sure to catch exceptions inside
#   your submitted jobs, or else modify the scheduler to wrap jobs in some
#   kind of catch-all handler before starting them.


# Little immutable object. Sorts by scheduled time so suitable for putting in
# a heap. Uses slots to save memory.
@attr.s(frozen=True, slots=True)
class Job:
    scheduled_time: float = attr.ib()  # trio time
    async_fn = attr.ib(cmp=False)


# The object used to submit jobs to the scheduler. A thin abstraction wrapper
# around a memory channel.
@attr.s(frozen=True, cmp=False)
class Scheduler:
    _send_channel: trio.abc.SendChannel[Job] = attr.ib()

    async def schedule_at(self, scheduled_time, async_fn, *args):
        """Schedule 'async_fn(*args)' to run when the Trio clock reads
        'scheduled_time'.

        """
        await self._send_channel.send(
            Job(scheduled_time, partial(async_fn, *args))
        )

    async def schedule_after(self, seconds, async_fn, *args):
        """Schedule 'async_fn(*args)' to run after 'seconds' seconds
        have passed.
        """
        scheduled_time = trio.current_time() + seconds
        await self.schedule_at(scheduled_time, async_fn, *args)


async def scheduled_task_runner(*, task_status):
    heap = []
    async with trio.open_nursery() as nursery:
        s, r = trio.open_memory_channel(0)
        scheduler = Scheduler(s)
        task_status.started(scheduler)
        async with s, r:
            # Main loop:
            while True:
                # Process everything in the heap that's already expired
                now = trio.current_time()
                while heap and heap[0].scheduled_time <= now:
                    job = heapq.heappop(heap)
                    nursery.start_soon(job.async_fn)
                # Wait until either a new job is submitted, or else the next
                # deadline expires.
                if heap:
                    next_deadline = heap[0].scheduled_time
                else:
                    next_deadline = math.inf
                with trio.move_on_at(next_deadline):
                    new_job = await r.receive()
                    heapq.heappush(heap, new_job)
                # Loop around to process any expired jobs, and recalculate the
                # next_deadline.


async def example():
    from datetime import datetime

    def print_with_time(msg):
        print("{}: {}".format(datetime.now().isoformat(), msg))

    async def job1():
        print_with_time("Job 1 running!")

    async def job2():
        print_with_time("Job 2 running!")

    async def shutdown(cancel_scope):
        print_with_time("Shutting down")
        cancel_scope.cancel()

    async with trio.open_nursery() as nursery:
        scheduler = await nursery.start(scheduled_task_runner)
        print_with_time("Scheduling job1 to run in 5 seconds")
        await scheduler.schedule_after(5, job1)
        print_with_time("Scheduling job2 to run in 10 seconds")
        await scheduler.schedule_after(10, job2)
        print_with_time("Scheduling shutdown to run in 11 seconds")
        await scheduler.schedule_after(11, shutdown, nursery.cancel_scope)


if __name__ == "__main__":
    trio.run(example)

Sample output:

2019-03-05T19:05:40.079580: Scheduling job1 to run in 5 seconds
2019-03-05T19:05:40.079638: Scheduling job2 to run in 10 seconds
2019-03-05T19:05:40.079670: Scheduling shutdown to run in 11 seconds
2019-03-05T19:05:45.085195: Job 1 running!
2019-03-05T19:05:50.086190: Job 2 running!
2019-03-05T19:05:51.082547: Shutting down

For me, scheduling 1,000,000 jobs like this takes ~400-500 megabytes of memory, so it’s about 10x more memory-efficient than having all those jobs calling sleep individually. I guess it would be even smaller if we skipped calling partial unless it was actually needed…

There’s also another way to do it – when a new job arrives, then instead of using a memory channel to tell the scheduler about it, we could instead add it directly to the queue, and then update the scheduler’s next wakeup time. This is even trickier, though the final code is arguably simpler :slight_smile:

@attr.s
class Scheduler:
    _cancel_scope = attr.ib(factory=trio.CancelScope)
    _heap = attr.ib(factory=list)

    def schedule_at(self, scheduled_time, async_fn, *args):
        job = Job(scheduled_time, partial(async_fn, *args))
        heapq.heappush(self._heap, job)
        # If the new job is scheduled to happen before the scheduler's next
        # wakeup time, then update the wakeup time
        if scheduled_time < self._cancel_scope.deadline:
            self._cancel_scope.deadline = scheduled_time

    def schedule_after(self, seconds, async_fn, *args):
        scheduled_time = trio.current_time() + seconds
        self.schedule_at(scheduled_time, async_fn, *args)


async def scheduled_task_runner(*, task_status):
    scheduler = Scheduler()
    task_status.started(scheduler)
    async with trio.open_nursery() as nursery:
        # Main loop:
        while True:
            # Process everything in the heap that's already expired
            now = trio.current_time()
            while scheduler._heap and scheduler._heap[0].scheduled_time <= now:
                job = heapq.heappop(scheduler._heap)
                nursery.start_soon(job.async_fn)
            # Sleep until the next deadline. If the next deadline changes due
            # to new jobs being scheduled, then schedule_at will update our
            # deadline to make sure we still wake up at the right time.
            if scheduler._heap:
                next_deadline = scheduler._heap[0].scheduled_time
            else:
                next_deadline = math.inf
            scheduler._cancel_scope = trio.CancelScope(deadline=next_deadline)
            with scheduler._cancel_scope:
                await trio.sleep(math.inf)

This is used exactly the same way as the previous example, except that now schedule_at and schedule_after are sync methods instead of async methods.

1 Like

https://github.com/agronholm/apscheduler seems designed for scheduling far-future tasks, and the persistent backend lets you avoid worrying about losing data during process restarts. Last I heard, the plan was to provide anyio compatibility.