Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO.race doesn't propagate winner's IOLocal context #3100

Open
vladislavsheludchenkov opened this issue Jul 14, 2022 · 10 comments
Open

IO.race doesn't propagate winner's IOLocal context #3100

vladislavsheludchenkov opened this issue Jul 14, 2022 · 10 comments

Comments

@vladislavsheludchenkov
Copy link

We're using IOLocal for trace id propagation. It worked for us perfectly as expected when dealing just with IO instances. However, currently we can't use IOLocal magic with fs2 streams, due to them using IO.race to handle interruptions.

Here's a cats-effect example:

import cats.effect.IO
import cats.effect.IOLocal
import cats.effect.unsafe.implicits.global

val effectWithoutRace = for {
  local <- IOLocal[Option[String]](None)
  _ <- local.set(Some("uhh... hi!"))
  _ <- local.get.flatMap(IO.println)
} yield ()

effectWithoutRace.unsafeRunSync() // prints Some(uhh... hi!)

def withRace[A](effect: IO[A]): IO[A] = effect.race(IO.never).map(_.merge)

val effectWithRace = for {
  local <- withRace(IOLocal[Option[String]](None))
  _ <- withRace(local.set(Some("uhh... hi!")))
  _ <- withRace(local.get.flatMap(IO.println))
} yield ()

effectWithRace.unsafeRunSync() // prints None

Scastie for this code snippet: https://scastie.scala-lang.org/jshavzY0STWiAESgLgTqHg

Wrapping IOLocal.set statement in IO.race leads to context being enriched with a new value and then immediately discarded on exiting the race.

And here's an example with fs2 that we've actually encountered in the wild:

import cats.effect.IO
import cats.effect.IOLocal
import cats.effect.unsafe.implicits.global

val stream = for {
  local <- fs2.Stream.eval(IOLocal[Option[String]](None))
  _ <- fs2.Stream.eval(local.set(Some("uhh... hi!")))
  _ <- fs2.Stream.eval(local.get.flatMap(IO.println))
} yield ()

stream.compile.drain.unsafeRunSync() // prints Some(uhh... hi!)

stream.interruptWhen(IO.never[Either[Throwable, Unit]]).compile.drain.unsafeRunSync() // prints None

Scastie for this code snippet: https://scastie.scala-lang.org/LghgY5G4RMWx0f4CRmUIqw

So any usage of interruptWhen on a fs2.Stream wraps every eval into F.race to handle actual interruptions.

A possible solution to make IOLocal usable with fs2 would be preserving IOLocal changes made within the race if they're made from an effect that won the race. While it makes sense to avoid merging contexts after joining back from IO.both, IO.race should be less problematic, as no merging is required here, just preserving winner's context.

Shoutout to @danielleontiev for debugging this issue with me

@armanbilge
Copy link
Member

Thanks for reporting. Currently this is by-design.

We also encountered this when investigating tracing for Streams. See

See also some discussion on more sophisticated semantics for IOLocals.

@vladislavsheludchenkov
Copy link
Author

So it's a "can't fix" from fs2 side and "won't fix" from cats-effect side? That's sad, IOLocals are amazing when they work properly.

But I didn't see a discussion about race semantics in the initial merge request, which is not exactly the same as fork/join. Since racePair is already handled within IOFiber#runLoop, it looks like localState may be handled there as well. It will introduce some complexity, but not too much IMO. What do you think?

@vladislavsheludchenkov
Copy link
Author

I've taken a closer look on a racePair, and it looks like it's also used for situations where there is no clear winner (e.g. both), so always propagating localState there is not an option, as it'll make these situations a lot more confusing.

So it's a bit more complexity than I expected initially, but we can still reimplement race and raceOutcome so they preserve localState somehow, because they have a clear winner and we don't have to debate which context we should keep and which to discard

@djspiewak
Copy link
Member

So the general solution that was envisioned for this when we decided the local semantics is actually IOLocalRef, which ironically wasn't implemented at the time because no one could come up with a practical scenario where it was needed. Fast forward a bit and this is actually the first time I've heard of this interaction with Fs2! Whoops.

The problem in general with IOLocal itself having special interactions with fiber joining is that most interesting concurrent code uses Deferred or Queue rather than join, and obviously you can't enrich those with special magic. IOLocalRef solves this though by putting a Ref in the local and mutating that, so your context propagates around.

@armanbilge
Copy link
Member

This is the first I heard about IOLocalRef :) what exactly are its semantics? It sounds a lot like putting a Ref inside an IOLocal which I'm not sure would work for tracing.

@djspiewak
Copy link
Member

@armanbilge That's exactly what it was proposed to be. Like I said, we never actually implemented it. :-) But basically, it was intended to be Local's API, but with a Ref inside of it and an explicit fork (or something) operator. This would give you the ability to share state arbitrarily around fibers.

@armanbilge
Copy link
Member

Just to throw it out there, another possible way we could try and support these usecases without implementing IOLocal "merging". This would require us to compromise on fiber identity instead, but then we could do something like this:

def timeout[A](ioa: IO[A], duration: Duration): IO[A] = 
  IO.fiber { fiber => // get handle to the current fiber
    (IO.sleep(duration) *> fiber.cancel).background.surround {
      ioa
    }
  }

In this case, you never leave the fiber started the timeout, so locals propagate completely naturally. (You also save the "overhead" of creating and scheduling an extra fiber to run ioa.)

This would have limited applicability however e.g. you could not implement timeoutTo like this, since timeouts require canceling a fiber, and if that's the fiber you are on, you cannot recover from that :)

@djspiewak
Copy link
Member

This would have limited applicability however e.g. you could not implement timeoutTo like this, since timeouts require canceling a fiber, and if that's the fiber you are on, you cannot recover from that :)

I think this would also change the definition of timeout, since a failed timeout would result in canceled, rather than the current semantic which results in raiseError(new TimeoutException).

@armanbilge
Copy link
Member

Oh you're right! I forgot that's how it's working currently, for some reason I thought it just canceled. So, yeah, tricky :)

@kubukoz
Copy link
Member

kubukoz commented Oct 4, 2024

@armanbilge That's exactly what it was proposed to be. Like I said, we never actually implemented it. :-) But basically, it was intended to be Local's API, but with a Ref inside of it and an explicit fork (or something) operator. This would give you the ability to share state arbitrarily around fibers.

apparently I didn't see this thread but wrote a blogpost showcasing this very idea: https://blog.kubukoz.com/flavors-of-shared-state/

What's noteworthy, I think, is that putting a Ref inside is not that much different from putting a mutable object into the local, which is what Natchez does - so this pattern is already in use, just not well documented.

And for the record, I think IOLocal's semantics do make sense in some scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants