Skip to content

Commit

Permalink
Merge pull request #575 from twmb/precommittxn
Browse files Browse the repository at this point in the history
kgo / changelog: last minute rename from CommitTxn to TxnCommit
  • Loading branch information
twmb authored Sep 21, 2023
2 parents 036f599 + 60b601a commit 2f8a7c4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
13 changes: 7 additions & 6 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2289,18 +2289,19 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
return context.WithValue(ctx, commitContextFn, fn)
}

type commitTxnContextFnT struct{}
type txnCommitContextFnT struct{}

var commitTxnContextFn commitTxnContextFnT
var txnCommitContextFn txnCommitContextFnT

// PreCommitTxnFnContext attaches fn to the context through WithValue. Using
// PreTxnCommitFnContext attaches fn to the context through WithValue. Using
// the context while committing a transaction allows fn to be called just
// before the commit is issued. This can be used to modify the actual commit,
// such as by associating metadata with partitions (for transactions, the
// default internal metadata is the client's current member ID). If fn returns
// an error, the commit is not attempted.
func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, commitTxnContextFn, fn)
// an error, the commit is not attempted. This context can be used in either
// GroupTransactSession.End or in Client.EndTransaction.
func PreTxnCommitFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, txnCommitContextFn, fn)
}

// CommitRecords issues a synchronous offset commit for the offsets contained
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
var commitErrs []string

committed := make(chan struct{})
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
g = s.cl.commitTransactionOffsets(ctx, postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
defer close(committed)
if err != nil {
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func (g *groupConsumer) commitTxn(
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
onDone(req, nil, err)
return
Expand Down

0 comments on commit 2f8a7c4

Please sign in to comment.