Skip to content

Commit

Permalink
kgo / changelog: last minute rename from CommitTxn to TxnCommit
Browse files Browse the repository at this point in the history
This better mirrors TxnOffsetCommit, also in the same api call, and
better mirrors how it's pretty universally Txn then Commit wherever you
see it.

Also fixes the context in GroupTransactSession.End to actually be passed
all the way to the commit function, and documents where it can be used.
  • Loading branch information
twmb committed Sep 21, 2023
1 parent 036f599 commit 60b601a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
13 changes: 7 additions & 6 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2289,18 +2289,19 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
return context.WithValue(ctx, commitContextFn, fn)
}

type commitTxnContextFnT struct{}
type txnCommitContextFnT struct{}

var commitTxnContextFn commitTxnContextFnT
var txnCommitContextFn txnCommitContextFnT

// PreCommitTxnFnContext attaches fn to the context through WithValue. Using
// PreTxnCommitFnContext attaches fn to the context through WithValue. Using
// the context while committing a transaction allows fn to be called just
// before the commit is issued. This can be used to modify the actual commit,
// such as by associating metadata with partitions (for transactions, the
// default internal metadata is the client's current member ID). If fn returns
// an error, the commit is not attempted.
func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, commitTxnContextFn, fn)
// an error, the commit is not attempted. This context can be used in either
// GroupTransactSession.End or in Client.EndTransaction.
func PreTxnCommitFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, txnCommitContextFn, fn)
}

// CommitRecords issues a synchronous offset commit for the offsets contained
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
var commitErrs []string

committed := make(chan struct{})
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
g = s.cl.commitTransactionOffsets(ctx, postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
defer close(committed)
if err != nil {
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func (g *groupConsumer) commitTxn(
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
onDone(req, nil, err)
return
Expand Down

0 comments on commit 60b601a

Please sign in to comment.