Arbitrary number of "asynchronous context managers"

Hello,

I am having trouble getting async with to work with an arbitrary (and varying) number of asynchronous context managers. Once, contextlib provided nested for this, but apparently they were unable to make it work properly so it was dropped. This led me to Trio… I thought perhaps Trio would be able to manage the context managers if I added them to the Nursery. However, this does not seem to be the case. My original code is:

async with self._clean_targets['test']['gui']['cube'].serve(self.__stop) as cube1, \
           self._clean_targets['try_multifield_1']['gui']['cube'].serve( self.__stop ) as cube2, \
           websockets.serve( self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] ) as ctrl, \
           websockets.serve( self._clean_targets['test']['converge']['pipe'].process_messages, self._clean_targets['test']['converge']['pipe'].address[0], self._clean_targets['test']['converge']['pipe'].address[1] ) as conv1, \
           websockets.serve( self._clean_targets['try_multifield_1']['converge']['pipe'].process_messages, self._clean_targets['try_multifield_1']['converge']['pipe'].address[0], self._clean_targets['try_multifield_1']['converge']['pipe'].address[1] ) as conv2:
    self.__result_future = asyncio.Future( )
    yield ( self.__result_future, { 'cube1': cube1, 'cube2': cube2, 'conv1': conv1, 'conv2': conv2, 'ctrl': ctrl } )

'try_multifield_1' and 'test' are just a hard-coded single use case, the actual number of instances would be determined at runtime.

I believe websocket.serve(...) is returning an asynchronous context manager. The process_message functions are async functions which respond to messages sent from the external process (browser with Bokeh display). I hoped that I would be able to use Trio like:

async with trio.open_nursery( ) as ns:
    ns.start_soon( self._clean_targets['test']['gui']['cube'].serve(self.__stop) )
    ns.start_soon( self._clean_targets['try_multifield_1']['gui']['cube'].serve( self.__stop ) )
    ns.start_soon( websockets.serve( self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] ) )
    ns.start_soon( websockets.serve( self._clean_targets['test']['converge']['pipe'].process_messages, self._clean_targets['test']['converge']['pipe'].address[0], self._clean_targets['test']['converge']['pipe'].address[1] ) )
    ns.start_soon( websockets.serve( self._clean_targets['try_multifield_1']['converge']['pipe'].process_messages, self._clean_targets['try_multifield_1']['converge']['pipe'].address[0], self._clean_targets['try_multifield_1']['converge']['pipe'].address[1] ) )
    self.__result_future = asyncio.Future( )
    await self.__result_future

Is there a way to use a Trio nursery to run and clean up these asynchronous context managers (along with the async functions)? I assume that there is not since maybe this is outside of Trio’s wheelhouse… anyway, I’m definitely not an asyncio expert so I would really appreaciate any advice. Thanks in advance…

The initial problem should have been obvious… the functions should be passed as:

async with trio.open_nursery( ) as ns :
    ns.start_soon( self._clean_targets['test']['gui']['cube'].serve, self.__stop )
    ns.start_soon( self._clean_targets['try_multifield_1']['gui']['cube'].serve, self.__stop )
    ns.start_soon( websockets.serve, self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] )
    ns.start_soon( websockets.serve, self._clean_targets['test']['converge']['pipe'].process_messages, self._clean_targets['test']['converge']['pipe'].address[0], self._clean_targets['test']['converge']['pipe'].address[1] )
    ns.start_soon( websockets.serve, self._clean_targets['try_multifield_1']['converge']['pipe'].process_messages, self._clean_targets['try_multifield_1']['converge']['pipe'].address[0], self._clean_targets['try_multifield_1']['converge']['pipe'].address[1] )
    self.__result_future = asyncio.Future( )
    await self.__result_future

The next problem I ran into was that the @asynccontextmanager decoration seems to cause Trio to see the function as non-async, with errors like:

TypeError: Trio expected an async function, but 'CubeMask.serve' appears to be synchronous

removing these decorations… allows it to progress further, but this function:

async def serve( self, stop_function ):
    self._stop_serving_function = stop_function
    async with websockets.serve( self._pipe['image'].process_messages, self._pipe['image'].address[0], self._pipe['image'].address[1] ) as im, \
        websockets.serve( self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] ) as ctrl:
       yield { 'im': im, 'ctrl': ctrl }

still results in the error:

TypeError: start_soon expected an async function but got an async generator <async_generator object CubeMask.serve at 0x15897d640>

which I do not know how to resolve… changing the yield to a pass lets it run, but the websocket connection fails…

contextlib.AsyncExitStack seems like a plausible solution… however, while this code (in the context of the broader application) exits properly when the Future is set:

async with self._clean_targets['test']                              ['gui']['cube'].serve(self.__stop) as test1, \
           self._clean_targets['try_multifield_1']                  ['gui']['cube'].serve( self.__stop ) as try_multifield_1_1, \
           websockets.serve( self._clean_targets['test']            ['converge']['pipe'].process_messages, self._clean_targets['test']['converge']['pipe'].address[0], self._clean_targets['test']['converge']['pipe'].address[1] ) as test2, \
           websockets.serve( self._clean_targets['try_multifield_1']['converge']['pipe'].process_messages, self._clean_targets['try_multifield_1']['converge']['pipe'].address[0], self._clean_targets['try_multifield_1']['converge']['pipe'].address[1] ) as try_multifield_1_2, \
           websockets.serve( self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] ) as ctrl:
    self.__result_future = asyncio.Future( )
    yield ( self.__result_future, { 'test': [ test1, test2 ], 'try_multifield_1': [try_multifield_1_1, try_multifield_1_2], 'ctrl': ctrl } )

changing it to:

result = { 'test': [], 'try_multifield_1': [] }
async with AsyncExitStack( ) as stack:
    for img in ['test','try_multifield_1']:
        result[img].append( stack.enter_async_context( self._clean_targets[img]['gui']['cube'].serve(self.__stop) ) )
        result[img].append( stack.enter_async_context( websockets.serve( self._clean_targets[img]['converge']['pipe'].process_messages, self._clean_targets[img]['converge']['pipe'].address[0], self._clean_targets[img]['converge']['pipe'].address[1] ) ) )
    result['ctrl'] = stack.enter_async_context( websockets.serve( self._pipe['control'].process_messages, self._pipe['control'].address[0], self._pipe['control'].address[1] ) )
    self.__result_future = asyncio.Future( )
    yield ( self.__result_future, result )

causes it to fail to exit despite the fact that the same Future await etc. happens down stream. Other options are asyncio.with and asyncio.gather… each with different behavior WRT exceptions… along with a bespoke implementation. I ended up going with the latter since if there’s problems down the road I will have a stable easily tweaked implementation.

With a custom context manager chain, this can be expressed as:

from ..utils import ContextMgrChain as CMC
...
async with CMC( *( [ ctx for img in self._clean_targets.keys( ) for ctx in
                     [
                         self._clean_targets[img]['gui']['cube'].serve(self.__stop),
                         websockets.serve( self._clean_targets[img]['converge']['pipe'].process_messages,
                                           self._clean_targets[img]['converge']['pipe'].address[0],
                                           self._clean_targets[img]['converge']['pipe'].address[1] )
                     ]
                   ] + [ websockets.serve( self._pipe['control'].process_messages,
                                           self._pipe['control'].address[0],
                                           self._pipe['control'].address[1] ) ]
                  ) ):
    self.__result_future = asyncio.Future( )
    yield self.__result_future

Sorry for the stream of consciousness style post, but perhaps it will be useful for someone down the road…