Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling Async queue take makes other take hang #3998

Closed
durban opened this issue Feb 13, 2024 · 5 comments · Fixed by #4001
Closed

Cancelling Async queue take makes other take hang #3998

durban opened this issue Feb 13, 2024 · 5 comments · Fixed by #4001
Labels

Comments

@durban
Copy link
Contributor

durban commented Feb 13, 2024

This test fails:

    "foobar" in real {
      val mkQueue = Queue.unboundedForAsync[IO, String] // this fails
      // val mkQueue = Queue.unboundedForConcurrent[IO, String] // this is ok
      val test = mkQueue.flatMap { q =>
        val Item = "foobar"
        def taker(ref: Ref[IO, String]): IO[Unit] = IO.uncancelable { poll =>
          poll(q.take).flatMap(ref.set)
        }

        for {
          // 2 takers:
          ref1 <- IO.ref("")
          ref2 <- IO.ref("")
          fibers <- IO.both(taker(ref1).start, taker(ref2).start)
          (fib1, fib2) = fibers
          _ <- IO.sleep(0.1.second)
          // race cancelling one of them
          // and pushing one item;
          // whatever happens, exactly one
          // of them must receive that item:
          _ <- fib1.cancel.both(q.offer(Item)) // this fails
          // _ <- fib1.cancel.productR(q.offer(Item)) // this is ok
          item1 <- ref1.get
          _ <- if (item1.nonEmpty) {
            // fib1 must've took the item
            IO(item1 mustEqual Item) *> {
              // and fib2 is not yet finished
              ref2.get.flatMap { item2 =>
                IO(item2.isEmpty must beTrue)
              } *> {
                // make fib2 not hang by pushing another item:
                q.offer("x") *> fib2.joinWithNever *> ref2.get.flatMap { item2 =>
                  IO(item2 mustEqual "x")
                }
              }
            }
          } else {
            IO.ref(false).flatMap { timedOut =>
              // fib1 (it's already completed)
              // didn't take the item; fib2 must:
              fib2.joinWithNever.timeoutTo(
                4.seconds,
                timedOut.set(true) *> q.offer("y") *> fib2.joinWithNever // fallback to not make the test hang
              ) *> ref2.get.flatMap { item2 =>
                IO(item2 mustEqual Item) *> timedOut.get.flatMap { timedOut =>
                  // if we used the fallback, make the test fail:
                  IO(timedOut mustEqual false) // fails HERE
                }
              }
            }
          }

        } yield ()
      }

      test.parReplicateA_(10000).as(ok)
    }

Observations:

  • Using unboundedForConcurrent (instead of unboundedForAsync) fixes it.
  • Using fib1.cancel.productR(q.offer(Item)) instead of both (i.e., no race between cancel and offer) fixes it.
  • When it fails, it fails at the line I marked with HERE; I think this means, that the item wasn't lost (fib2 received it), just the wakeup was lost, then the fallback offer waked it up.
@djspiewak
Copy link
Member

This definitely looks like a bug in take cancelation handling. I thought we had nailed all of those. Is there any way we can minimize this test case a bit? It's like… doing a lot of things. :P I'll take a look at the implementation asap though.

@durban
Copy link
Contributor Author

durban commented Feb 14, 2024

Yeah, I was trying to debug, that's why it became so complicated. Minimization is something like:

    "foobar" in real {
      val test = Queue.unboundedForAsync[IO, String].flatMap { q =>
        for {
          fibers <- IO.both(q.take.start, q.take.start)
          (fib1, fib2) = fibers
          _ <- IO.sleep(0.1.second)
          _ <- IO.both(fib1.cancel, q.offer("foobar"))
          s <- fib1.joinWith(IO.pure(""))
          s <- if (s.nonEmpty) {
            IO.pure(s)
          } else {
            fib2.joinWithNever
          }
          _ <- IO(s mustEqual "foobar")
        } yield ()
      }

      test.parReplicateA_(10000).as(ok)
    }

(Except this doesn't fail, it hangs.)

@djspiewak
Copy link
Member

Does this only happen with the unbounded queue? Or does it also happen with bounded?

@durban durban linked a pull request Feb 16, 2024 that will close this issue
@durban
Copy link
Contributor Author

durban commented Feb 16, 2024

Only unbounded. And also, there is an existing test for (more or less) the same thing. It just doesn't run for unbounded. See #4001, where I try to fix this in the same way as it already works for bounded.

@durban
Copy link
Contributor Author

durban commented Feb 16, 2024

Btw, this "let's wake up someone, just in case" seems exactly like something which could be avoided by #3553.

djspiewak added a commit that referenced this issue Feb 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants