Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock with parallel processing of single partition stream #852

Closed
vladimirkl opened this issue May 18, 2023 · 16 comments · Fixed by #1123
Closed

Deadlock with parallel processing of single partition stream #852

vladimirkl opened this issue May 18, 2023 · 16 comments · Fixed by #1123
Labels
bug Something isn't working

Comments

@vladimirkl
Copy link
Contributor

Hi, I need to take n records from single partition, process in parallel batches, and commit. Unfortunately this code causes a deadlock in Consumer on commit:

 val consumerLayer: ZLayer[Kafka, Throwable, Consumer] =
     ZLayer.scoped ( ZIO.serviceWithZIO[Kafka] { kafka =>
       Consumer.make(ConsumerSettings(kafka.bootstrapServers).withGroupId("group1").withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)))
     })

...

Producer.produceChunk(Chunk.fromIterable(1 to 1000).map(n => new ProducerRecord(topic, n, n.toString)), Serde.int, Serde.string) *>
  Consumer.plainStream(Subscription.topics(topic), Serde.int, Serde.string)
    .take(100)
    .groupedWithin(10, 100.millis)
    .mapZIOPar(2)(c => ZIO.debug(c.size) as c.map(_.offset))
    .map(OffsetBatch.apply)
    .debug("Offset")
    .mapZIO(_.commit)
    .debug("Commit")
    .runDrain zipPar Fiber.dumpAll.delay(20.seconds)

Replacing groupedWithin with grouped makes this code working. Also I can leave groupedWithin in place and remove take - no deadlock occurs but I need to terminate stream exactly for n records. It looks like race condition on stream termination with take. Tested with zio-kafka 2.3.0, Scala 2.13.10 and embedded Kafka

@erikvanoosten erikvanoosten added the bug Something isn't working label May 19, 2023
@guizmaii
Copy link
Member

Thanks for your report @vladimirkl 🙂

I'm wondering if this is a bug related to zio-kafka or zio-streams 🤔
Because, for example, changing groupedWithin with grouped as you did, has nothing to do with zio-kafka.
Why do you think it comes from zio-kafka?

Sorry for the questions. I'm trying to better understand your issue.

@vladimirkl
Copy link
Contributor Author

It may be some zio-streams issue, but I never encountered it without zio-kafka - while I have similar aggregations in other places . It locks exactly on commit. If you wait some time after deadlock you will see RunloopTimeout exception. Without commit take terminates stream correctly

@vladimirkl
Copy link
Contributor Author

I even tried to replace commit with something non interruptible like ZIO.attemptBlocking(Thread.sleep(2000)) and it works correctly. So it's very likely Consumer issue

@guizmaii
Copy link
Member

Thanks for the additional details :)

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented May 21, 2023

I used this program to test the issue: https://gist.github.com/erikvanoosten/5e9f34d8ff43de32b583c021c858e309

This problem is caused by an immediate unsubscribe after the stream ends (see https://github.com/zio/zio-kafka/blob/master/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala#L254). Since we are no longer subscribed, we (can) no longer poll. When we stop polling, the commit callbacks will not be called anymore and all progress is halted.

Only programs that do not consume the entire stream (the example here uses take()) are affected by this bug.

Potential solutions:

  • Delay unsubscribe until there are no more pending commits.
  • Upon unsubscribe, cancel all pending commits.
  • Wait for Await commits during revoke #830 and use the new rebalanceSafeCommits mode. With this mode enabled we await commit completion during partition revocation. Unsubscribe also causes such a revocation and therefore this program would work fine with this mode enabled. (Note that this PR needs an as of yet unreleased Kafka, at least 3.6.0 if all goes well.)

@svroonland WDYT?

@vladimirkl
Copy link
Contributor Author

Why same code works with grouped instead of groupedWithin? Perhaps order of finalization is different?

@erikvanoosten
Copy link
Collaborator

Why same code works with grouped instead of groupedWithin? Perhaps order of finalization is different?

groupedWithin is time based. This causes a fiber split, everything above the groupedWithin runs on another fiber than what follows. I suspect this causes the unsubscribe to happen fractionally later than with grouped.

@vladimirkl
Copy link
Contributor Author

Things are much worse with zio-streams 2.0.14. Even simpler example hangs forever:

    Consumer.plainStream(Subscription.topics(topic), Serde.int, Serde.string)
      .take(100)
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .debug("Offset")
      .mapZIO(_.commit)
      .debug("Commit")
      .runDrain

This code works perfectly with zio-streams 2.0.13. Unfortunately take becomes dangerous for zio-kafka streams at all.

@svroonland
Copy link
Collaborator

@erikvanoosten I believe your analysis is correct, the take ends the stream and unsubscribes. I have similar issues with take and finalizer race conditions in zio-kinesis.

Regarding the possible solutions: if it's a race condition, then I'm not sure we always have pending commits to await before unsubscribing.

We could look into a usage pattern that uses graceful shutdown to end the stream but keep the subscription.

@guizmaii
Copy link
Member

guizmaii commented Jun 3, 2023

@svroonland Have a look at #890. The issue seems to be with the commitAsync

@svroonland
Copy link
Collaborator

As in, commitAsync requires poll calls to complete? Yeah

@vladimirkl
Copy link
Contributor Author

I encountered few more issues with hanging commit in other scenarios - when broker dies, then dies KafkaConsumer, but async commit hangs. Compared with fs2-kafka implementation - it has similar behaviour, but uses timeout for commit operation (15 seconds by default). zip-kafka user can add timeouts to commit everywhere in their code, but I think it's a good idea to add default timeout for safety - similar to fs2-kafka. I can create a PR. What do you think?

@guizmaii
Copy link
Member

I can create a PR. What do you think?

Please do :)

@guizmaii
Copy link
Member

guizmaii commented Aug 5, 2023

Isn't this issue fixed by #982? Can we close it?

@vladimirkl
Copy link
Contributor Author

Not sure - timeout is definitely better than deadlock, but original issue with async grouping still remains. However if we cannot handle it all, we can close it for now.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Nov 29, 2023

As of zio-kafka 2.7.0 there should be no dead-locks while committing anymore. See #1109.

In addition, since zio-kafka 2.7.1 there is a work around. See #1123.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants