Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setting offset metadata in a transaction #559

Closed
iamnoah opened this issue Sep 13, 2023 · 4 comments · Fixed by #570
Closed

setting offset metadata in a transaction #559

iamnoah opened this issue Sep 13, 2023 · 4 comments · Fixed by #570
Labels
enhancement New feature or request

Comments

@iamnoah
Copy link
Contributor

iamnoah commented Sep 13, 2023

I'm looking to add offset metadata to an offset commit in a transaction. In the past, with the Java client, I've used the offset metadata to store the offset into a non-Kafka data source that I'm copying from.

I found kgo.PreCommitFnContext which lets me set the metadata for a normal consumer offset commit, but I don't see anything analogous for a GroupTransactSession, in fact I noticed that the metadata for a transactional offset commit is always set to the member ID.

So what I'm looking for is:

  1. Load offsets with metadata, then loop:
  2. Start a transaction
  3. Produce some records loaded from a data source (using the loaded metadata as my cursor)
  4. Commit offsets with metadata that records my position in the data source
  5. End transaction

3 is where I'm stuck. My motivation here is to get exactly once semantics since the data source position and records are committed together. I've omitted rebalance handling, etc.

Is there a way to do this that I'm missing? If not, would a transactional hook like PreCommitFnContext for transactions be a welcome PR?

@twmb
Copy link
Owner

twmb commented Sep 16, 2023

I'm +1 to adding support for PreCommitFnContext to GroupTransactSession's End function -- this would be a good addition and a good place to annotate the commit message. Unfortunately due to API constraints -- the precommit function currently takes kmsg.OffsetCommitRequest whereas you're going to want kmsg.TxnOffsetCommitRequest -- we'll need to add a new PreTxnCommitFnContext.

lmk if you'd like to add support for it.

@twmb twmb added the enhancement New feature or request label Sep 16, 2023
twmb added a commit that referenced this issue Sep 21, 2023
Allows modifying TxnOffsetCommitRequest before it is issued --
particularly, to modify the metadata field.

Closes #559.
@twmb twmb closed this as completed in #570 Sep 21, 2023
@iamnoah
Copy link
Contributor Author

iamnoah commented Sep 21, 2023

@twmb thanks for adding this!

Trying it out, the missing piece is that the topic I am committing offsets against may not have changed offsets to commit, but I still want to commit and modify the metadata because my position in my external source has changed. The least bad way seemed like another hook to force specific offsets to be committed (see linked PR,) but I'm not picky about the exact API, as long as I can force a commit with new metadata as part of a transaction.

@twmb
Copy link
Owner

twmb commented Sep 22, 2023

What do you think about instead changing the behavior in the source: you can modify the input request, add your offsets, and then update postcommit based on the TxnOffsetCommit req+resp on current line 291? This would also avoid adding a new API (and I'm less keen to bump a minor version for a bit), and instead issue this as a patch release?

@iamnoah
Copy link
Contributor Author

iamnoah commented Sep 25, 2023

@twmb responded in the linked ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants