Hi,
I’m using trio in guest mode. The computationally intensive parts of my program are not event driven; the event loop is mainly used for asynchronous events, which are handled by calling loop.process_work()
regularly from my heavy computations.
Sometimes the event loop is used for non-IO events, e.g. when it is nice to express some part of my computation as a coroutine.
Looking at the API docs, it seems that trio would be a good match for this use case, but there seem to be some race conditions that make behavior non-deterministic. In particular, if I unblock a trio task and call loop.process_work()
, it sometimes happens that the task does not advance as expected.
Here is a stripped-down toy example:
from queue import SimpleQueue, Empty
import time
import trio
class Loop:
def __init__(self):
self.q = SimpleQueue()
def run_sync_soon_threadsafe(self, fn):
self.q.put(fn)
def process_work(self):
while True:
time.sleep(0.000001)
try:
f = self.q.get_nowait()
except Empty:
runner = trio._core._run.GLOBAL_RUN_CONTEXT.runner
print('q', runner.runq)
return
else:
f()
loop = Loop()
ls = []
async def work():
ls.append(1)
ev = trio.Event()
async def worker():
global ev
while 1:
await ev.wait()
await work()
ev = trio.Event()
trio.lowlevel.start_guest_run(
worker,
run_sync_soon_threadsafe=loop.run_sync_soon_threadsafe,
done_callback=lambda _: exit(1))
for i in range(500):
print('iteration', i)
ls = []
# unblock worker
ev.set()
assert ls == []
# I expect this to finish the next iteration of worker()'s loop
loop.process_work()
assert ls != []
The assertion on the last line fails ~2% of the time (~70% of the time if you remove the time.sleep(0.000001)
). Output is something like:
iteration 0
q deque([])
iteration 1
q deque([])
iteration 2
q deque([])
iteration 3
q deque([])
iteration 4
q deque([])
iteration 5
q deque([])
iteration 6
q deque([])
iteration 7
q deque([])
iteration 8
q deque([])
iteration 9
q deque([])
iteration 10
q deque([<Task '__main__.worker' at 0x7fe159775ee0>])
Traceback (most recent call last):
File "test_trio.py", line 51, in <module>
assert ls != []
AssertionError
Some tracing suggests that this comes from a race in _run.GuestState
; self.run_sync_soon_threadsafe(in_main_thread)
causes in_main_thread()
to be called, but sometimes too late (after ).
I did find a workaround: it seems that this only happens when runq is nonempty, so I can busy-wait for the IO thread to empty it, like:
def process_work(self):
while True:
try:
f = self.q.get_nowait()
except Empty:
runner = trio._core._run.GLOBAL_RUN_CONTEXT.runner
if runner.runq:
time.sleep(0.000001)
continue
return
else:
f()
This workaround seems to work (no observed errors in a few million iterations), but I don’t understand it fully. Is it safe? Is there a better way to do it? Should trio in principle be extended to cover this use case better?
Also, it would be interesting to hear why is it needed in the first place. It seems that guest_tick says hello to the IO thread, and that this sometimes causes events from the guest thread to somehow jump to the IO thread and back again. Why?