-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for topic/partition in KeyRecordGrouper #167
Add support for topic/partition in KeyRecordGrouper #167
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephen-harris thanks for this PR!
I agree this is a good addition to the connectors that depend on this library.
Though, instead of modifying the existing KeyRecordGrouper
-- and as there is a TopicPartitionRecordGrouper
-- what do you think about having this requirement covered by a new record grouper (e.g. TopicPartitionAndKeyRecordGrouper
)?
Hi @jeqo Sorry for the delay in getting back to you, I've been on annual leave. I'm happy to go with that approach if you prefoer. I think it'll require some changes to the connector through (i.e. the new grouper will still have a 1 file limitation - but happy to make the necessary changes there when this library is updated. |
I've refactored this, and added a further tests for the Regarding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stephen-harris, looking good! Left some comments
src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionAndKeyRecordGrouper.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionAndKeyRecordGrouper.java
Outdated
Show resolved
Hide resolved
src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java
Outdated
Show resolved
Hide resolved
Thanks @jeqo, I've made the requested changes. One thing I did notice (and I think this can be resolved by a separate PR), but I'm not sure if we'd want to invoke that method in |
@stephen-harris thanks! Good catch! Agree, it looks like either dead-code or we are missing to call it from the storage connector configs. Could you create an issue first to discuss this further? Could you also rebase this PR to get the latest codeql workflow executed? |
@jeqo Done :) |
@stephen-harris thanks! But seems git history got a bit messed up. Could you take a look? From my side, seems that if you move one step back from the last merge commit, it should be fine: So, reset to the previous commit and force push. I tested it here and looks better: master...jeqo:commons-for-apache-kafka-connect:feat/support-topic-partition-in-key-record-group |
e328c10
to
f3b4356
Compare
@jeqo Yup, I accidentally merged in the remote branch rather than force-pushing the rebased local branch. Fixed now. |
void keyOnly() { | ||
final Template filenameTemplate = Template.of("{{key}}"); | ||
final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); | ||
assertEquals(RecordGrouperFactory.KEY_RECORD, grType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use AssertJ everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @stephen-harris, sorry for hijacking the workflow but could you please rebase onto main and fix the assertions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks @AnatolyPopov, missed this in my review.
@stephen-harris If you rebase your PR, this will be validated by checkstyle based on #192
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AnatolyPopov @jeqo Rebased and updated to use AssertJ
This reverts commit 420a610.
…tionAndKeyRecordGrouper.java Co-authored-by: Jorge Esteban Quilcate Otoya <[email protected]>
…perFactoryTest.java Co-authored-by: Jorge Esteban Quilcate Otoya <[email protected]>
f3b4356
to
bbcff43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @stephen-harris
Thanks @jeqo, are you able to publish the update and I'll get a PR together for the S3-connector. |
@stephen-harris sure! will do a release later this week. Will ping you here once available |
@stephen-harris see https://github.com/Aiven-Open/commons-for-apache-kafka-connect/releases/tag/v0.11.0 -- looking forward to more contributions! Many thanks! |
This adds support for including topic/partition in the file template when using KeyRecordGrouper.
The wider context is the desire to have a connector ingesting multiple compacted topics, and wanting to write records within the same topic and with the same key, to the same location
Fix #178