Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multiple subscriptions on the same Consumer #517

Merged
merged 56 commits into from
Feb 27, 2023

Conversation

svroonland
Copy link
Collaborator

@svroonland svroonland commented Oct 12, 2022

With this PR, a single Consumer instance and its backing apache kafka Consumer instance, can support multiple topic/pattern/manual subscriptions (as long as they are of the same type).

Inspired by https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#sharing-the-kafkaconsumer-instance

  • Subscriptions are now passed as parameter to plainStream/partitionedStream/partitionedAssignmentStream.
  • Removed subscribeAnd, SubscribedConsumer, SubscribedConsumerFromEnvironment.
  • Subscriptions are scoped to the lifetime of the plainStream/partitionedStream/partitionedAssignmentStream.
  • subscribe and unsubscribe have been removed from the Consumer trait in favour of scoped subscriptions.
  • When an error occurs downstream of plainStream/partitionedStream, the stream may be restarted thanks to the scoped subscriptions. This was not possible previously, requiring to make a new Consumer instance (or perhaps unsubscribe & resubscribe).

Implements #217.

@svroonland
Copy link
Collaborator Author

@vigoo @adamgfraser @guizmaii Could I get your opinion on this interface change?

@guizmaii
Copy link
Member

guizmaii commented Jan 8, 2023

@svroonland Can you rebase your branch, please?

@svroonland
Copy link
Collaborator Author

The only merge conflict is in some docs. Let's gather some feedback on these changes first before merging.

Copy link

@nmcb nmcb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by lack of understanding - given the description of this pr - and difference between: restartable plainStream, partitionedStream and non restartable partitionedAssignmentStream i would expect the type signatures and/or their documentation to differ more prominently - am i missing something ?

@guizmaii
Copy link
Member

guizmaii commented Jan 21, 2023

I tried to find a way to be more typesafe: no more runtime error possible with the "union" operation, but I didn't find any way because in the end you're just composing streams 🤔

… `Serde.byteArray` as these deserializers are not doing anything anyway (#654)
@guizmaii
Copy link
Member

guizmaii commented Feb 24, 2023

Here's a proposal to improve a bit the readability: #663

Can you rebase your branch also, please :)

guizmaii
guizmaii previously approved these changes Feb 27, 2023
@svroonland svroonland merged commit 4ea0d18 into master Feb 27, 2023
@svroonland svroonland deleted the multiple-subscriptions branch February 27, 2023 07:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants