I just put the class below together for use in a request/response flow. A memory channel seemed overkill for waiting on a single result to be dispatched from a coroutine that reads messages from the connection. I wanted to be able to do this:
async def connect(self):
[make the connection]
hello_slot = Slot()
self.conn.message_handlers[MessageType.SERVERHELLO] = hello_slot.set
hello = Message()
hello.type = MessageType.CLIENTHELLO
hello.client_hello.clientID = self.config["machine_id"]
hello.client_hello.auth = self.config["auth_secret"]
logger.debug("Sending client hello message to server")
await self.conn.send_message(hello)
# Might raise a TooSlowError here (timeout)
server_hello = await self.hello_slot.get(self.config["hello_timeout"])
del self.conn.message_handlers[MessageType.SERVERHELLO]
[verify server hello]
So I made this little wrapper:
import trio
class Slot:
def __init__(self, value=None):
self.value = value
self.event = trio.Event()
async def set(self, value):
self.value = value
self.event.set()
async def get(self, timeout=None):
if self.event.is_set():
return self.value
if timeout is None:
await self.event.wait()
return self.value
async with trio.fail_after(timeout):
await self.event.wait()
return self.value
def get_threadsafe(self, timeout=None):
return trio.from_thread.run_sync(self.get, timeout)