SocketStream.putback...?

I wish to do protocol detection by peeking at incoming data, and still have the protocol handler (such as HTTP2 or trio.SSLStream) see that same data when it reads the stream (yes, I know, it is a bit evil). This is a one-time thing with the initially received data, and I am wondering what would be the best way to implement it. E.g. derive from SocketStream, override receive_some to inject the already received data, then possibly change type back to normal SocketStream to avoid the overhead?

I’ve noticed similar needs on the sending side, where I want to pack multiple buffers into one packet but they are sent by different handlers. I think I need stream.send_soon(buffer) that will queue the buffer to be sent on next await stream.send_all(before_this). But this is much easier worked around by passing the buffer to that other handler and actually doing await stream.send_all(buffer + before_this) there.

The simplest way would be to make a PushbackStream:

class PushbackStream(trio.abc.Stream):
    def __init__(self, transport_stream):
        self.transport_stream = transport_stream
        self._buf = b""

    def pushback(self, data):
        # Warning: this is O(n**2) if you do a lot of pushbacks
        # I'm assuming you only plan to call pushback ~once so it's fine; if not then some kind
        # of smarter datastructure would be better (maybe a deque of bytes objects?)
        self._buf = data + self._buf

    async def receive_some(self, max_bytes=None):
        if self._buf:
            if max_bytes is None:
                max_bytes = len(self._buf)
            data = self._buf[:max_bytes]
            self._buf = self._buf[max_bytes:]
            return data
        else:
            return await self.transport_stream.receive_some(max_bytes)

    async def wait_send_all_might_not_block(self):
        await self.transport_stream.wait_send_all_might_not_block()

    async def send_all(self, data):
        await self.transport_stream.send_all(data)

    async def send_eof(self):
        await self.transport_stream.send_eof()

    async def aclose(self):
        await self.transport_stream.aclose()

This would definitely make things more complicated. Have you measured the overhead? Is it significant for your application?

1 Like

Thanks! I’ll try that and see how it performs. I’m currently porting Sanic to Trio; performance is quite critical there, so I do not wish to add any overhead that might affect large file transfers. I’ll benchmark to see if that overhead matters at all (probably not).

Re appending performance: in this particular application data is a bytearray (because that is significantly faster than bytes in a receive-append loop), so data + self._buf will also become bytearray, and that should be fast enough.

You could do this with a stream wrapper too… one classic name for this functionality is “cork”, as in the TCP_CORK option. (Basically: when you cork the socket, the kernel buffers written data until you uncork it again.) I’m not a huge fan of the API, because it creates a hidden coupling between different parts of your program: the code that writes the first half of the message and corks the stream will only work correctly if it’s followed by a call to write the second half of the message and uncork the stream, but you can’t see this by looking at the calls:

# This looks like two independent function calls,
# but in fact it's a bug if you ever call one without the other.
await send_message_header(stream)
await send_message_body(stream)

Assembling a single buffer manually before calling send_all is just as efficient, and it’s much more obvious what’s going on:

msg = bytearray()
msg += create_header()
msg += create_body()
await stream.send_all(msg)

(Also now create_header and create_body don’t actually do I/O, so they’re easier to test.)

That’s just my preference though… if you want to implement corking, it’s certainly possible.

I guess sendfile is another thing that might be important for large file transfers Missing feature: sendfile · Issue #45 · python-trio/trio · GitHub

Anyway, yeah, I’d get it working and then run a profiler to see where it’s slow. It could be a PushbackStream, but I’m guessing some other stuff will show up as more critical :-).

bytearrays are pretty great: they have amortized O(1) performance for appending data to the end, deleting data from the end, and deleting data from the beginning. But here we need to append data to the beginning, and for bytearrays that’s O(n), which means that doing it repeatedly is O(n**2). (My code also removes data using buf = buf[max_bytes:], which is also O(n)… if you make sure buf is a bytearray then you can instead do del buf[:max_bytes], which is amortized O(1). But there’s no point in worrying about it if appending the data is O(n**2) anyway.)

(OK I have to ask: Why does sanic need to do protocol sniffing?)

I gather that sendfile cannot really be used nowadays, considering that everything is going TLS (unless you want to deal with in-kernel TLS), and even on top of that you have HTTP/2 framing and what-not.

The asyncio/uvloop version of Sanic doesn’t do any protocol sniffing but I thought that might be an interesting thing to play with, as I am adding HTTP/2 support now as well. First of all, during development it might be practical to run with-and-without SSL on the same (> 1024) port. Maybe more importantly, it turns out that clients initiate HTTP/2 in different ways, so I must support:

  • TLS handshake with ALPN (which gives the proper non-overridable HTTP version!)
  • No encryption or no ALPN, HTTP/1.1 request with Upgrade: h2c (by the RFC, used in curl)
  • No encryption or no ALPN, straight HTTP/2 request (nghttp)

(not sure if SSL without ALPN really needs to be supported, haven’t seen that yet)

I guess you might already know this, but just in case: you only need protocol sniffing if you want to support TLS+noTLS on the same port, or if you want to support RFC-compliant version negotiation and nghttp’s non-RFC-compliant thing on the same port.

AFAIK most people handle this by running TLS and no-TLS on different ports, and not caring about nghttp’s weird thing, in which case you don’t need any protocol sniffing. And not using TLS during development, because the cert management is a huge hassle :slight_smile:

def push_back(stream, data):
    stream_type = type(stream)

    class PushbackStream(stream_type):
        async def receive_some(self, max_bytes=None):
            if max_bytes and max_bytes < len(data):
                ret = data[:max_bytes]
                del data[:max_bytes]
                return ret
            self.__class__ = stream_type
            return data

    stream.__class__ = PushbackStream

Got the job done.

Oh wow, I have complicated feelings about this solution :slight_smile:

On the one hand, Trio’s API in general is designed to encourage composition over inheritance, and none of our classes were designed with subclassing in mind. Supporting subclassing in a library like Trio is really difficult, because it makes lots of class internals effectively part of your public API, so if you aren’t careful then you can easily break user code with simple refactorings, and it’s impossible to test things properly. We actually have an issue open to consider disabling subclassing for all Trio classes:

And dynamically generating subclasses and mutating __class__ is even more fragile and hard to understand than regular subclassing. (E.g. if someone implements a stream using C or Cython, then your code won’t work because __class__ won’t be mutable.)

On the other hand, in this specific case, it is short, effective, and in some sense optimal for the machine – just not for human readers…

Haha! Instead of subclassing, you could monkey-patch the stream object…

def push_back(stream, data):
    async def receive_some(max_bytes=None):
        if max_bytes and max_bytes < len(data):
            ret = data[:max_bytes]
            del data[:max_bytes]
            return ret
        del stream.receive_some
        return data

    stream.receive_some = receive_some

This works, too, but does not support recursion (not that it would need to), and won’t work with slots, Cython etc, or if called via type(stream).receive_some(stream, ...). I guess I’ll stick with dynamic subclassing because it just feels inherently evil and I like living dangerously!

Both implementations take ownership of the original buffer and that ought to be well documented to avoid mishaps.

re: subclassing protection – I believe that trying to protect your objects in Python is a lost cause.

The goal isn’t “protection” in the sense of making it impossible. Obviously users can go monkeypatching whatever they want. The goal would be to make sure that users don’t start accidentally depending on internal implementation details – for long-lived, widely-used projects, this makes maintenance really hard, because you eventually end up in a situation where you can’t fix bugs or refactor things internally without accidentally breaking users. (Possibly relevant context: the last two projects I spent a lot of time working on were numpy and cpython, which both have extreme versions of this problem!) So we try to make it very clear what’s public API and what isn’t. If you have to go monkeypatching some trio internals to enable subclassing, then at least you know that you’re living dangerously :slight_smile: