Problems when synchronously advancing a host event loop

Hi,

I’m using trio in guest mode. The computationally intensive parts of my program are not event driven; the event loop is mainly used for asynchronous events, which are handled by calling loop.process_work() regularly from my heavy computations.

Sometimes the event loop is used for non-IO events, e.g. when it is nice to express some part of my computation as a coroutine.

Looking at the API docs, it seems that trio would be a good match for this use case, but there seem to be some race conditions that make behavior non-deterministic. In particular, if I unblock a trio task and call loop.process_work(), it sometimes happens that the task does not advance as expected.

Here is a stripped-down toy example:

from queue import SimpleQueue, Empty
import time
import trio

class Loop:
    def __init__(self):
        self.q = SimpleQueue()
    def run_sync_soon_threadsafe(self, fn):
        self.q.put(fn)
    def process_work(self):
        while True:
            time.sleep(0.000001)
            try:
                f = self.q.get_nowait()
            except Empty:
                runner = trio._core._run.GLOBAL_RUN_CONTEXT.runner
                print('q', runner.runq)
                return
            else:
                f()
loop = Loop()

ls = []
async def work():
    ls.append(1)
ev = trio.Event()
async def worker():
    global ev
    while 1:
        await ev.wait()
        await work()
        ev = trio.Event()

trio.lowlevel.start_guest_run(
    worker,
    run_sync_soon_threadsafe=loop.run_sync_soon_threadsafe,
    done_callback=lambda _: exit(1))

for i in range(500):
    print('iteration', i)
    ls = []
    # unblock worker
    ev.set()
    assert ls == []
    # I expect this to finish the next iteration of worker()'s loop
    loop.process_work()
    assert ls != []

The assertion on the last line fails ~2% of the time (~70% of the time if you remove the time.sleep(0.000001)). Output is something like:

iteration 0
q deque([])
iteration 1
q deque([])
iteration 2
q deque([])
iteration 3
q deque([])
iteration 4
q deque([])
iteration 5
q deque([])
iteration 6
q deque([])
iteration 7
q deque([])
iteration 8
q deque([])
iteration 9
q deque([])
iteration 10
q deque([<Task '__main__.worker' at 0x7fe159775ee0>])
Traceback (most recent call last):
  File "test_trio.py", line 51, in <module>
    assert ls != []
AssertionError

Some tracing suggests that this comes from a race in _run.GuestState; self.run_sync_soon_threadsafe(in_main_thread) causes in_main_thread() to be called, but sometimes too late (after ).

I did find a workaround: it seems that this only happens when runq is nonempty, so I can busy-wait for the IO thread to empty it, like:

    def process_work(self):
        while True:
            try:
                f = self.q.get_nowait()
            except Empty:
                runner = trio._core._run.GLOBAL_RUN_CONTEXT.runner
                if runner.runq:
                    time.sleep(0.000001)
                    continue
                return
            else:
                f()

This workaround seems to work (no observed errors in a few million iterations), but I don’t understand it fully. Is it safe? Is there a better way to do it? Should trio in principle be extended to cover this use case better?

Also, it would be interesting to hear why is it needed in the first place. It seems that guest_tick says hello to the IO thread, and that this sometimes causes events from the guest thread to somehow jump to the IO thread and back again. Why?

One problematic thing about my workaround is that it requires that the host event loop knows about trio; I have not yet managed to add the trio guest glue as an independent module on top of an unmodified host loop.

Interesting problem!

To hopefully make more sense of what’s happening, you might find it helpful to look at these diagrams:

Basically, whenever Trio has no work to do (all its tasks are blocked), then it has to “go to sleep while waiting for IO or a host-triggered wakeup”. In your case, there is no IO to do, so all it really has to do is go to sleep until the host loop wakes it up. But for simplicity, we don’t bother to distinguish those two cases – Trio always issues the regular IO wait call, and if there’s no IO, then we still use the thread to call the OS “wait for IO” primitive, and it just ends up implementing a sleep.

So when it’s in that “wait for IO in thread” state, when the host triggers a wakeup, it first unwinds the IO thread before continuing – again, this is to simplify things, so that host-triggered wakeups and IO-triggered wakeups take the same path, and so Trio overall remains a sequential program. (If Trio started running tasks again while the IO thread was still running, then we could end up with like two IO threads running at once, etc. – lots of potential headaches we’d have to think through.)

So as long as Trio is blocking in a real OS system call in a real OS thread, there’s always going to be some unavoidable nondeterminism about exactly how long it takes for Trio to react after a wakeup.

I think the reason your workaround works is because when you wake up a Trio task from your host loop, by calling Event.set or whatever, then the task gets scheduled by placing it on the runq, and it doesn’t get removed again until Trio has come back from the IO thread and run its scheduler. So it should be pretty reliable? It’s pretty awkward and likely to break in the future though, since it’s relying so heavily on complicated internal details.

I think what I’d recommend to try next is using the Instrument API. This gives you a fully-supported way to keep track of what the Trio event loop is doing. In particular, I guess the main operation you want is “wait for Trio to quiesce”? I.e., keep pumping Trio until it’s fully processed all previous events? (Which might take multiple Trio loop iterations, in case the Trio task that you wake up then wakes up another task, etc.)

To implement that, you could hook the before_io_wait(timeout) instrumentation event. This is slightly misnamed – it really gets called every time Trio checks for IO, whether or not it’s going to wait for it. Since you aren’t really doing IO or using timeouts, the timeout parameter should either be 0 (if Trio has more work to do, and is just doing a quick check to make sure no IO has come in already), or else math.inf. And if it’s math.inf, that means that Trio has fully quiesced and won’t be doing anything until some external event wakes it up. So something like:

class MyInstrument(trio.abc.Instrument):
    def before_io_wait(self, timeout):
        notify_host_that_trio_has_quiesced()

Would that work? I can think of some other possibilities, like having a worker task that loops on await trio.testing.wait_all_tasks_blocked() and sends a similar quiesce signal, or adding a whole new “no IO” mode to Trio’s backend. But the Instrument approach seems simplest if it works.

Oh hmm, are you using the autojump clock? That might not play nicely with the Instrument trick, because then they’re both trying to detect when Trio has quiesced and might end up fighting with each other. If your larger system already has a notion of simulated time then I guess you could inject that somehow using trio.MockClock directly and manually stepping it from the host.

(Note for future browsers: there’s also some discussion in chat here: python-trio/general - Gitter)

The Instrument tricks indeed seem to solve my problems: With the following implementation of Loop.process_work:

    def process_work(self):
        while True:
            try:
                f = self.q.get_nowait()
            except Empty:
                return
            f()

I can do something like this:

@dataclass(eq=False)
class IOThreadState(trio.abc.Instrument):
    quiesced = True
    def before_io_wait(self, wait):
        if wait > 0:
            self.quiesced = True
    def task_scheduled(self, *args):
        self.quiesced = False
trio.lowlevel.start_guest_run(
    worker,
    run_sync_soon_threadsafe=loop.run_sync_soon_threadsafe,
    done_callback=lambda _: exit(1))
io_state = IOThreadState()
trio.lowlevel.add_instrument(io_state)

def process_work():
    while True:
        loop.process_work()
        if io_state.quiesced:
            loop.process_work()
            return
        time.sleep(0.000001)

The top-level implementation of process_work can be called to quiesce all coroutines.

I could not use the get trick in my real code, because the only python-visible operations are process_work and run_sync_soon_threadsafe. But I’m OK with busy waiting.

In my real use case, I still need to modify the original event loop somehow, in order to correctly handle code that calls loop.process_work instead of the new top-level process_work. I think this is (inelegantly) solvable.