When explaining structured concurrency to newbies, but as well when the different ways to implement it Happy Eyeballs protocol (RFC 8305) has roven to be very instructional. At the moment there’s at least Trio and libdill version out there.
The nice thing is that it exhibits all the aspects of structured concurrency (cancelation, error handling, different kind of async tasks etc.) which still remining relatively simple.
Another nice feature is that it comes with “easy” and “hard” mode. When proving your initial implementation of structured concurrency you can start with with a stripped down version which does does the DNS query synchronously, then does actual connecting in asynchronous manner. When successful you can switch to the hard mode, where the DNS queries are done asynchronously, as described in the RFC.
This post is not meant to propose anything specific. It just to point out that in Happy Eyeballs we have a kind of common gauge, an example problem we can use to compare different ideas: How easy it is to implement Happy Eyeballs with each proposed solution?
Happy Eyeballs is indeed good algorithm to try to implement.
The javadoc for the FiberScope API in Project Loom includes a number of code samples, one of which is described as:
“The method is called with an array of socket addresses and returns a SocketChannel connected to one of the addresses. The connection attempts are staggered. A fiber is scheduled to connect to the first socket address and if the connection is not established within a certain time then another fiber is scheduled to try to connect to the next socket address. The staggered attempts continue until a connection is established, none of the connections succeed, or the deadline is reached. In the event that several connections are established then all but one are closed so that the method does not leak resources.”
This may be another interesting example to explore.
Yes, that sounds very much like the simple version of Happy Eyeballs. Can you link to the sample? I would like to put the three implementations side by side and see whether anything interesting could be learned from that.
In my PyCon talk last year I walked through a Trio implementation of Happy Eyeballs, which I think is where the idea of using it as an example comes from: https://www.youtube.com/watch?v=oLkfnc_UMcE
Your solution is concise, pure, type-safe, and wrong. When two sockets concurrently succeed to connect, the loser of the race is discarded and stays opened for an undefined duration.
As we talked on twitter - yes, there was a problem with unclosed sockets. Fixed now, the solution isn’t very elegant, but I think works fine. Thanks for spotting the error.
I actually spent a fair amount of time myself trying to model this algorithm declaratively. The solution I eventually preferred involves a shared dataflow variable to model the race explicitly, such that each attempt can know if the socket must be kept open. I’m not sure it’s the most elegant solution so I’m definitely interested knowing if you end up with something better with ZIO !
Moral of the story : happy eyeballs is really an exciting problem.
I think I’ve come up with a better solution: embracing the fact that multiple sockets might end up being open. We run the original HE algorithm as-is (which manages the timing, concurrency, races), but modify the effects that are run so that they collect all successfully produced values.
Then, on top of this we build a second stage, which creates a queue for all successfully produced values, runs HE, and closes all but one (which is returned):
object ReleasableHappyEyeballs {
def apply[R, T](
tasks: List[ZIO[R, Throwable, T]],
delay: Duration,
releaseExtra: T => ZIO[R, Nothing, Unit]
): ZIO[R with Clock, Throwable, T] =
for {
successful <- Queue.bounded[T](tasks.size)
enqueueingTasks = tasks.map { t =>
t.onExit {
case Success(value) => successful.offer(value)
case Failure(_) => ZIO.unit
}
}
_ <- HappyEyeballs(enqueueingTasks, delay)
// there has to be at least one, otherwise HE would fail
first :: other <- successful.takeAll
_ <- ZIO.foreach(other)(releaseExtra)
} yield first
}
Note that .onExit is guaranteed to be run if a value is produced, which is crucial taking into account possible interruption.
I enjoyed your post; thanks for sharing! And I was also wondering about socket cleanup, so it’s nice to see that discussion. Though to be fair, I also skipped this detail in my talk, for time reasons
Your revised version here is conceptually similar to how Trio’s real implementation handles this: we keep a collection of all the open sockets, and then at the end close all the ones that are still open:
The version that’s currently in your post is interesting:
// the interrupt might have happened when the socket is already
// established.
// In this case, we need to cleanup and close the socket.
effectBlocking {
socket = new Socket(address, port)
socket
}.onInterrupt(closeSocket)
Does this work? I.e., do ZIO’s semantics guarantee that if two raced effects with onInterrupt handlers complete simultaneously, then at least one of their onInterrupt handlers will be called? Or is it possible to have a sequence like:
left effectBlocking resolves successfully
left onInterrupt resolves successfully
right effectBlocking resolves successfully
right onInterrupt resolves successfully
race detects that the left fiber has completed, and interrupts the right fiber, but the right fiber has already resolved so onInterrupt isn’t called
?
I peeked at the implementation of race but it didn’t help as much as I was hoping In Trio the equivalent of that sequence would be possible though, which is why we can’t handle the final socket cleanup inside the individual tasks, but instead do it at the end.
Your revised version here is conceptually similar to how Trio’s real implementation handles this: we keep a collection of all the open sockets, and then at the end close all the ones that are still open:
Maybe it’s an essential part of any solution then
I peeked at the implementation of race but it didn’t help as much as I was hoping In Trio the equivalent of that sequence would be possible though, which is why we can’t handle the final socket cleanup inside the individual tasks, but instead do it at the end.
Yes I think you are correct, that implementation would also be wrong. But still this might be made to work. There’s a variant of race, raceWith, which receives both the value that won and the loosing fiber.
We would be able then to interrupt the fiber (calling Fiber.interrupt); as a result, we would get either an error (meaning that the fiber was indeed interrupted), or the computed value (if the fiber was already completed). In the first case releasing would have already happened, in the second we would need to call release on the computed value (which lost).
But that would only increase complexity so I think the layered solution is better so I converted the example to use that
Thanks for the comment and pointing out another possible bug!