Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make access to the java consumer fair #1109

Merged
merged 4 commits into from
Nov 17, 2023
Merged

Conversation

flavienbert
Copy link
Contributor

@flavienbert flavienbert commented Nov 16, 2023

Replace the semaphore that is used to control access to the java consumer with a reentrant lock that has fairness enabled. This prevents liveness problems for consumers that need to use the consumer, e.g. to fetch committed offsets. Fixes #1107.

@flavienbert
Copy link
Contributor Author

Fixes #1107

@svroonland
Copy link
Collaborator

Have you verified that this fixes the issue? A unit test would of course be very welcome

@flavienbert
Copy link
Contributor Author

I confirm it fixes the issue. And it fixes an other issue: #1036 I don't have the RejectedExecutionException anymore and the process close normally! Do you have an idea of this fix? Probably the java consumer is not used in time during shutdow process?

 ZIO
      .blocking(ZIO.suspend(f(consumer)))
      .catchSome { case _: WakeupException =>
        ZIO.interrupt
      }
      .fork
      .flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt))

@erikvanoosten
Copy link
Collaborator

I like it!

So now the question is, how to unit test this? Perhaps by calling to the consumer from the stream (e.g. get offsets), and then verifying that it makes progress?

@erikvanoosten
Copy link
Collaborator

Probably the java consumer is not used in time during shutdow process?

Likely.

@flavienbert
Copy link
Contributor Author

I tried to run the test I wrote with the semaphore version, and the test fail.

@erikvanoosten erikvanoosten merged commit 2cc9efc into zio:master Nov 17, 2023
14 checks passed
@erikvanoosten
Copy link
Collaborator

Thanks @flavienbert !

@flavienbert
Copy link
Contributor Author

@erikvanoosten you're welcome. Do you plan to publish a new version with this fix soon?

@erikvanoosten
Copy link
Collaborator

Yes definitely! Probably the coming weekend.

@erikvanoosten
Copy link
Collaborator

@flavienbert @erikvanoosten Seeing this PR from ZIO (merged but not yet released): zio/zio#8554 and seeing that zio-concurrent repo is archived, do we really want/need to use zio-concurrent ReentrantLock?

Edit: I checked and ReentrantLock is directly present in ZIO. We don't need the zio-concurrent dependency

Zio-concurrent is now a module in the main zio repository.
Eh, I do not see it in ZIO itself... did I miss something?

@guizmaii
Copy link
Member

@erikvanoosten My bad. I though it was coming from the zio-concurrent repo but comes indeed from ZIO's main repo

@erikvanoosten
Copy link
Collaborator

Now that the standard semaphore in zio is fair (since 2.0.20, see zio/zio#8554), we can revert this change, allowing us to remove an external dependency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PartitionedStream hanging after stream consumption started
4 participants