-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace sync Kafka consumers with confluent_kafka one #767
Conversation
90714d8
to
26ea01d
Compare
78ce72e
to
4740bab
Compare
981fa69
to
0039127
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost convinced its fine, just want to discuss a little bit on the doubt I've asked 😁, when @aiven-anton take a look I'm fine with merging
ea02eb7
to
7a31284
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review so far, leaving some comments.
7a31284
to
d3e000f
Compare
d3e000f
to
770ff3d
Compare
640a9d0
to
fda504b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, fully reviewed now, I think looks good as far as I can tell. The outstanding thing is the questions about LEO/HWM off by one ... Can we find some way to build higher confidence in those changes?
8e0f0d4
to
e91aafa
Compare
start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition] | ||
end_offset: int = consumer.end_offsets([topic_partition])[topic_partition] | ||
) -> Iterator[Message]: | ||
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another doubt this was correct due to mix of terminology, but have now verified it's the same underlying Protocol message involved (ListOffsets
):
- https://github.com/confluentinc/librdkafka/blob/df6efd25e6e44c06a937cbdf8fc81715668e0ce1/src/rdkafka.c#L3597
- https://github.com/dpkp/kafka-python/blob/0864817de97549ad71e7bc2432c53108c5806cf1/kafka/consumer/fetcher.py#L573
So looks correct 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, solid work! 👍
This change replaces all synchronous Kafka consumers (from the kafka-python library) with a new implementation based on confluent-kafka-python's `Consumer`, keeping the same interface as much as possible. The PyTest timeout is raised from 60s to 90s to accomodate for the default poll timeout for backups consumers (otherwise the tests would time out while still waiting for messages to arrive).o Since the `conluent_kafka.Consumer` implementation does not allow for consumers to be without a group ID, if the new `KafkaConsumer` client is not given one, we'll generate one on the fly to mimic a groupless behaviour. Resources: * confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# * librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
e91aafa
to
dd0ec0f
Compare
About this change - What it does
karapace.kafka.types
module introduced to consolidate additional data types used from confluent-kafka (to a certain extent, see comment thread)Why this way
KarapaceKafkaClient
still remain)Keeping confluent-kafka references inthis was agreed to be simplified and not be a goal after allkarapace.kafka.*