Sizing the channel: deadlock freedom vs back pressure

Hi! I have a question which, while not strictly about structured concurrency, feels like it should fit this forum.

Context: I am working on an “incremental compiler”, rust-analyzer, most of which is utterly boring, concurrently-speaking. There are several bits of what feels like should be fizz-buzz level concurrency though:

  • The interface is JSON-RPC with a single client.
  • The compiler is CPU-bound, so there’s a threadpool for doing parallel processing of requests.
  • We watch the file-system for changes.
  • Finally, sometimes we launch cargo process and process its stdout in a streaming fashion.

When implementing those things, I’ve immediately hit a problem which felt disproporionaly tricky to solve. And it seems like even the folks whose job is to design concurrency struggle with this problem (:wave:, @elizarov)!

I organize concurrency using “worker” pattern: simplistic, hand-coded actors. Each worker, like a virtual file system, runs a loop where it accepts requests from a inbox and sends replies back:

fn run(self, inbox: Receiver<Request>, sender: Sender<Response>)
    while let Some(message) = inbox.recv() {
        let response = self.process(message);
        sender.send(response);
    }
}

Workers form a hierarchy (with two level depth only :slight_smile: ), when the parent exits, it makes sure to wait for the child worker (yay structured concurrency!).

And here is the problem: the parent and the worker are connected with a pair of channels, requests flow in, responses flow out. How should I size the channels to avoid deadlocks?

I though that a good rule of thumb is to make channels synchronous (unbuffered), but this immediately leads to a deadlock:

fn parent_run(inbox: Receiver<ParentRequest>) {
    let (child_sender, child_receiver) = spawn_child();

    loop {
        let event = select! {
            recv(inbox) -> it => it.ok().map(Event::ParentRequest),
            recv(child_receiver) -> it => Some(Event::ChildResponse),
        };

        match event {
            Event::ParentRequest(request) => {
                let child_request = ...;
                child_sender.send(child_request); // blocked on child
            }
            Event::ChildResponse(response) => {
                ...
            }
        }
    }

}

fn child_run(inbox: Receiver<ChildRequest>, sender: Sender<ChildResponse>) {
    while let Some(message) = inbox.recv() {
        let response = process(message);
        sender.send(response); // blocked on parent
    }
}

The communication structure contains a cycle, and if both parties do .send without select, they deadlock each other.

The pragmatic solution I am using is to just make all the channels unbounded (note that fixed-capacity bound is the worst here, as you’d get deadlocked sometimes (you might guess how I’ve figured that out)). I think is typical for actor systems? But this obviously removes automated backpressure handling and makes the latency worse. I don’t care about those things in rust-analyzer (yet?), but I am curious if there are better solutions here? And, also, if actors use unbounded mailboxes, how do they implement backpressure?

Here’s some proper (but easy to get wrong) solutions I think would work.

First, when the workers form a strict hierarchy, it should be possible to make forward-edge channels synchronous and keep only back-edges unbounded. If you add biased select to the mix (ie, parent’s select should always prefer child responses over grandparent requests), you’d even have backpressure, provided that each request returns bounded number of responses. The problem here is that you need to decide for each channel between 0 and ∞ buffer, and that seems like an easy off by one error.

Another problem is that the amount of responses is not necessary bounded, so you might have backpressure problem still. A good example here is VFS actor in rust-analyzer. The request to VFS is “please, now watch this set of globs”. The responses are “this file path has changed”. If we bias towards processing responses, we might end up in a situation where child VFS worker floods us with “this path changed” events, while we have “please, now watch the empty set of globs already” request in the pipeline.

Second, I think you can break the cycle if there’s a select! which covers both direction. Ie, if at least one of parent or child doesn’t do a bare .send, and always select!s between send and recv, than the deadlock can be avoided. The first problem with this solution is that it just puts the buffer elsewhere. If you are a worker which cannot send a response, you’d have to accumulate requests in some local variable until the parent reads your response. The second problem is that I prefer giving workers opaque callbacks, instead of channels, for sending responses. That way I don’t depend on a particular channel library and, even if the worker uses channels internally, it stays implementation detail. But the difference between a callback and a channel is exactly that callback is not selectable. It can only block, it can’t signal readiness.

In general, it seems like I want a weaker programming model, where instead of select! which works for sends and receive, I want only a merge operation, which works only for receives. I wonder if putting senders into the select! block breaks unidirectional dataflow pattern…

To sum this overly long post up, here is the list of specific questions I am interested in:

  • If actor systems use unbounded channels, how do they handle backpressure?
  • Is there a way to avoiding deadlocks in a hierarchy of workers, which is as simple as “always use unbounded channels”, but exerts backpressure and otherwise places the limits on the usage of resources?
  • Is this an XY problem? Should I use something else instead of focusing on actor-ish workers?
2 Likes

Actor systems do traditionally use unbounded channels, and it does cause problems. Probably the Erlang folks have the most experience with deploying classic actor systems in production, so that’s who I’d look at first.

Erlang doesn’t impose any hard limit on channel buffers, but it does have a built-in mechanism for “soft” backpressure. When you send on an Erlang channel, then the send basically does sleep(scaling_constant * current_size_of_channel_buffer). So if you’re sending to an empty mailbox, that completes instantly, but as the receiver gets more and more overloaded, the senders get slowed down more and more in the hopes that this will let the receiver catch up. As you might imagine, this simple heuristic helps, but it just doesn’t understand the overall system well enough to reliably eliminate unbounded memory usage, and it can cause new own problems.

So backpressure/overload handling is still a perennial discussion topic in Erlang (this looks like a good overview of different approaches), and I don’t think anyone’s found a silver bullet.

IMO the ideal is to try to make your system work without buffering. If you can figure out an architecture that will do that, then you know you don’t have to rely on heuristics, and can set buffer sizes based purely on performance considerations without worrying it will affect correctness. But doing this depends on the details of your system, and I don’t understand rust-analyzer well enough to say how to do it :-).

One heuristic I’d suggest trying: see if you can figure out any way for data to flow through the system without using select! at all, just sequential tasks + nurseries. Basically select! is a way of writing a state machine by hand, and state machines are super low-level and hard for humans to reason about. The whole motivation for concurrency frameworks is to let you write at a higher level of abstraction with like, sequencing and composability and stuff, and then let the compiler generate the corresponding state machine. Writing state machines is like writing assembly – only appropriate in situations where your higher-level tools have failed. (Arguably, they’re exactly alike, in the sense that CPUs being state machines is what makes assembly hard to use.)

Also, remember your supervision hierarchy doesn’t have to match your data flow, and that tasks are cheap to create/destroy, so you don’t necessarily need to have long-lived tasks that handle multiple requests. I feel like there’s some intuitive thing where if we were organizing humans, we’d obviously want to have designated people to perform designated tasks and keep doing that for a while, because we can’t snap our fingers to create new perfectly-trained people, or snap our fingers to kill people after they’ve done a single task, and then when we try to reason about programs then there’s a tendency to carry over that intuition? I dunno, just speculation. But I don’t really understand why you have workers handling multiple requests at all.

I’m guessing at a high-level, rust-analyzer holds a big complex chunk of state (like the whole project’s parse trees or whatever), and then JSON-RPC requests trigger queries against this state, and file-system changes trigger updates to the state? If that’s right then I’d expect your strategy for managing concurrent readers+writers to be a major design question (do you want locking? some kind of copy-on-write/MVCC? etc.), but I don’t see why deadlocks would be a big issue. For JSON-RPC requests, you probably want a bound on how many you process at a time (using some kind of semaphore?), and then each one like takes the lock or acquires a consistent snapshot or whatever and does its thing. Overload handling is just: we don’t accept new requests if there are too many outstanding. Hopefully the client will take the hint.

And for filesystem changes, this is a case where there is no way to apply backpressure (you can’t tell the OS “stop letting other programs change files!”), so you either need to use an unbounded queue and cross your fingers (might work fine in practice!), or else have some strategy to detect and respond to overload, like “if we have more than N pending filesystem updates to process, then throw everything away and rebuild our state from scratch”. (This is presumably a bounded amount of work, because project directories have a bounded number of files in them, even if those files can have an unbounded sequence of updates.) Or maybe an even better heuristic would be, track how old the filesystem events you’re currently processing are, and if you start lagging by more than it would cost to rebuild the state from scratch, do that?

Isn’t that just having multiple senders on the same channel?

2 Likes

Thanks for the links about actors, that looks exactly like something I’ve been looking for for a while.

I’m guessing at a high-level, rust-analyzer holds a big complex chunk of state (like the whole project’s parse trees or whatever), and then JSON-RPC requests trigger queries against this state, and file-system changes trigger updates to the state?

Yup, although all three of {file system, JSON-RPC requests, error messages we read from compiler’s stdout} can change state.

If that’s right then I’d expect your strategy for managing concurrent readers+writers to be a major design question

Yes, this is indeed an interesting question, but it also has a clear answer. We use a single reader-writer lock around the state combined with cooperative cancellation so that readers are cancelled and don’t block writers.

But I don’t really understand why you have workers handling multiple requests at all.

The reason is that I think I need to maintain some worker-local state between requests. The “get error message from the compiler” worker is a good example here. It has roughly one input message Update, and two output messages ClearDiagnosits and AddDiagnostic (full source (single file) is here). Internally, the worker maintains a currently running compiler process, and, upon receiving update, it does some light debouncing and cancels currently running compiler process, if any. This needs state.

For file-system, the state is the set of currently active file watches, which needs to be cleared when we switch to the new config.

Additionally, I don’t think that just spawning a worker per request solves the problem? For background processing of read-only lsp requests, we do spawn a separate task for each request. Tasks are spawned from the main loop, and report results back to the main loop (mostly for convenience, but some tasks’ results genuinely want to apply modification to the state owned by the main loop). If both the spawn and the result-reporting are bounded (can block), there’s a possibility of a deadlock where a main loop tries to spawn an n+1 task, and all the n tasks are trying to report a result back.

After reading the linked article about backpressure in Erlang, I am slowly coming to conclusion that my mistake is thinking “channels fix backpressue”. It seems that either:

  • your system is a simple end-to-end pipeline, where sync channels do handle backpressure
  • your system is more complex, and the backpressure has to be an application-level concern (ie, an explicit counter of in-flight requests, metrics). In this case, make channels unbounded and don’t trick yourself into believing that just limiting the channels will get you proper backpressure

Isn’t that just having multiple senders on the same channel?

That’s an interesting question, turns out this is not exactly equivalent in Rust. In Rust, channels generally close when all the senders are dropped. That means that this obvious code handles cancellation properly:

for message in chan_receiver {
}

as soon as all senders are dead (which handles both normal and abnormal case), we just break out of the loop. One case where this pattern breaks down is when a component has an receiver with “incomming” messages from outside, and also passes this receiver to a worker spawned from withing a component. Ie, component owns a sender to the worker, and the worker owns the sender back to component. This is a cycle, and simple “exhaust the channel” method does not work, you need explicit “stop” messages (which then work only for happy case). The way out here is to create a dedicated channel for worker, so the parent component can select (merge) between its original input channel and the worker’s channel. When it notices that the original channel is closed, it droppes the send part of the worker’s channel, which causes the worker to terminate as well. See this for an extended discussion with a concrete example :slight_smile:

In other words, if we have separate channels, we have an additional ability to notice when one of them closes.

Oh yeah, running the compiler needs some kind of persistent task, for sure. One primitive I’ve used for this kind of “try to keep up to date” semantics is a “pulser”, which is basically a counter + an increment operation + a “wait for the value to be larger than it was the last time I checked”. There’s a Python/Trio version here:

Using it is pretty simple: you do async for _ in pulser.subscribe(): ..., and the guarantee is that the loop body will run whenever someone has called pulse since the last time the loop body ran. It’s easier to use than to describe :-).

If you like thinking in terms of channels, this is like a channel that handles overflow by coalescing messages.

If spawning is bounded, and each spawn only sends one result, then you don’t need to bound results, right? Though I’m actually not quite sure why this uses channels… in most frameworks I’m familiar with, a JSON-RPC server would be structured like:

  • a listener task that manages incoming connections, parses requests, etc.
  • each request gets its own task
  • these tasks get handed some kind of handle to the HTTP connection, so they can send results back directly (and thus they each automatically get backpressure applied by the network)

And even if your request tasks can’t send results directly for some reason, like if they’re all being queued onto a single connection, then can’t you use separate tasks for reporting back vs accepting new connections?

I have some hope that the Erlang literature is limited by Erlang only giving you unbounded channels, and that since we’re not using Erlang then for most projects we can find some combination of architecture and communication primitives that doesn’t involve unbounded channels or deadlock. But this is one of those conjectures that you can’t really test except by building lots of things over a decade, so I have no idea if it’s correct ¯\(ツ)

Ah neat, Trio ended up with something very similar.

Ah, sure, multiplexing regular work + cancellation is a common use for select in Go too, I think.

In Trio we currently don’t support select at all, and I’m… still thinking this might be for the best?

So obviously it helps a lot that in Trio, all primitives have two wakeup paths baked in: the regular one that depends on whatever the operation is doing, plus cancellation. So you don’t select for cancellation; it’s automatically there all the time.

But this example also crystallized something for me, that I think gets at the difference between the “actor perspective” and the “structured programming perspective”.

In the actor model, your basic tool for splitting a program into components is to use actors. In structured programming (including structured concurrency), your basic tool for splitting a program into components is to use functions/objects/modules/etc. So in the example, there’s a “worker actor” that the main task submits jobs to over a channel. But… why is this even an actor? In fact it seems like a lot of the work in that blog post is just spent on converting the thread pool’s object-with-methods interface into an actor-style loop-you-send-messages-to interface, and I don’t really see what any of that extra indirection accomplishes. If anything it just seems to make more problems, like your issue with cancellation.

Now that I think about it, it seems like trio and Rust models are the same here! (keep in mind that this is a tangent discussion, deadlock during cancellation != deadlock due to full channels). In Rust, all (good) synchronization primitives are also signaled on drop.

And it seems like I can construct identical example for trio, where re-using the same channel for communicating with your parent and your child creates a deadlock:

import trio
import math
import random

async def main():
    async with trio.open_nursery() as nursery:
        sender = spawn_component(nursery)
        async with sender:
            for i in range(10):
                await trio.sleep(random.random())
                color = ['red', 'green'][random.randrange(2)]
                await sender.send((color, i))
        print('outer sender is closed')



def spawn_component(nursery):
    async def go():
        async with trio.open_nursery() as nursery:
            worker_sender = spawn_worker(nursery, sender.clone())
            async with receiver, worker_sender:
                async for message in receiver:
                    print(f'component recv {message}')
                    if message[0] == 'red':
                        await worker_sender.send(message[1])

    (sender, receiver) = trio.open_memory_channel(math.inf)
    nursery.start_soon(go)
    return sender

def spawn_worker(nursery, respond_to: trio.MemorySendChannel):
    async def go():
        async with respond_to, receiver:
            total = 0
            async for message in receiver:
                print(f'worker recv {message}')
                await trio.sleep(random.random())
                total += message
                await respond_to.send(('total', total));

    (sender, receiver) = trio.open_memory_channel(math.inf)
    nursery.start_soon(go)
    return sender


if __name__ == "__main__":
    trio.run(main)

Here, component owns a worker “actor”. component receives requests from outside, and forwards some of them to worker. It then receives worker responses through the same channel. worker and component end up being stuck, as each holds a sending half of the channel.

Yeah, pretty similar. Probably the biggest difference is that Trio’s cancellation unwinding uses the regular error propagation path. This is mostly an ergonomic difference, but it does change capability in one important way: it means cancellation can wait for async operations while unwinding (like AsyncDrop, but only for tasks).

And huh, and now that I think about it, that is actually kind of crucial for this example, or any situation where the thing you’re trying to cancel has sub-tasks that are running in multiple threads. Then structured concurrency says you have to wait for all those tasks to unwind, and because of… how CPUs work, basically, you can’t synchronously cancel work that’s happening in other threads. Put together, those mean that tasks have to be able to suspend themselves while doing cancellation unwinding.

Oh yeah, for sure. You can definitely write concurrency bugs when using a structured concurrency framework :slight_smile:. But I thought it was interesting that it would simply never have occurred to me to build that program that way in the first place. There’s a strict definition of structured concurrency in terms of avoiding go statements, but I guess it also comes with a kind of philosophical perspective that emphasizes functions over actors.