diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 4ca249f0..02b7b8f3 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2238,6 +2238,20 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest) return context.WithValue(ctx, commitContextFn, fn) } +type commitTxnContextFnT struct{} + +var commitTxnContextFn commitTxnContextFnT + +// PreCommitTxnFnContext attaches fn to the context through WithValue. Using +// the context while committing a transaction allows fn to be called just +// before the commit is issued. This can be used to modify the actual commit, +// such as by associating metadata with partitions (for transactions, the +// default internal metadata is the client's current member ID). If fn returns +// an error, the commit is not attempted. +func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context { + return context.WithValue(ctx, commitTxnContextFn, fn) +} + // CommitRecords issues a synchronous offset commit for the offsets contained // within rs. Retryable errors are retried up to the configured retry limit, // and any unretryable error is returned. diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index f06469f5..c7620f68 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -1222,6 +1222,13 @@ func (g *groupConsumer) commitTxn( req.Topics = append(req.Topics, reqTopic) } + if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok { + if err := fn(req); err != nil { + onDone(req, nil, err) + return + } + } + var resp *kmsg.TxnOffsetCommitResponse var err error if len(req.Topics) > 0 {