Skip to content

Commit

Permalink
kgo: handle CONCURRENT_TRANSACTIONS more widely
Browse files Browse the repository at this point in the history
Single-broker localhost kraft testing shows this error being returned
**much** more frequently. Rather than having a 5s timeout, we now
continue until the input context is canceled. This largely affects
ending a transaction.

We also improve the locking situation in one area. It's a bit ugly, but
the solution is better than unlocking directly at every return site.

For #223.
  • Loading branch information
twmb committed Oct 18, 2022
1 parent 5567018 commit f4a508c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@ func WithHooks(hooks ...Hook) Opt {
// are expected to backoff slightly and retry the operation. Lower backoffs may
// increase load on the brokers, while higher backoffs may increase transaction
// latency in clients.
//
// Note that if brokers are hanging in this concurrent transactions state for
// too long, the client progressively increases the backoff.
func ConcurrentTransactionsBackoff(backoff time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,9 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID,
}

if err = kerr.ErrorForCode(resp.ErrorCode); err != nil {
if kerr.IsRetriable(err) { // this could return ConcurrentTransactions, but this is rare; ignore until a user report
// We could receive concurrent transactions; this is ignorable
// and we just want to re-init.
if kerr.IsRetriable(err) || errors.Is(err, kerr.ConcurrentTransactions) {
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization resulted in retriable error, discarding initialization attempt", "err", err)
return &producerID{lastID, lastEpoch, err}, false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (s *sink) doTxnReq(
req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn)
}
}()
err = s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
err = s.cl.doWithConcurrentTransactions(s.cl.ctx, "AddPartitionsToTxn", func() error {
stripped, err = s.issueTxnReq(req, txnReq)
return err
})
Expand Down
88 changes: 66 additions & 22 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,43 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry

kip447 := false
if wantCommit && !failed {
isAbortableCommitErr := func(err error) bool {
switch {
case errors.Is(err, kerr.IllegalGeneration), // rebalance begun & completed before we committed
errors.Is(err, kerr.RebalanceInProgress), // in rebalance, abort & retry later
errors.Is(err, kerr.CoordinatorNotAvailable), // req failed too many times (same for next two)
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions): // kafka not harmonized, we can just abort or retry / eventually abort
return true
}
return false
}

var commitErrs []string

committed := make(chan struct{})
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
defer close(committed)
if err != nil {
if isAbortableCommitErr(err) {
hasAbortableCommitErr = true
return
}
commitErrs = append(commitErrs, err.Error())
return
}
kip447 = resp.Version >= 3

for _, t := range resp.Topics {
for _, p := range t.Partitions {
switch err := kerr.ErrorForCode(p.ErrorCode); err {
case nil:
case kerr.IllegalGeneration, // rebalance begun & completed before we committed
kerr.RebalanceInProgress, // in rebalance, abort & retry later
kerr.CoordinatorNotAvailable, // req failed too many times (same for next two)
kerr.CoordinatorLoadInProgress,
kerr.NotCoordinator:
hasAbortableCommitErr = true
default:
commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err))
if err := kerr.ErrorForCode(p.ErrorCode); err != nil {
if isAbortableCommitErr(err) {
hasAbortableCommitErr = true
} else {
commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err))
}
}
}
}
Expand Down Expand Up @@ -578,7 +591,7 @@ func (cl *Client) EndAndBeginTransaction(
"commit", commit,
)
cl.producer.readded = false
err = cl.doWithConcurrentTransactions("EndTxn", func() error {
err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error {
req := kmsg.NewPtrEndTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
Expand Down Expand Up @@ -643,7 +656,7 @@ func (cl *Client) EndAndBeginTransaction(
// there could be a stranded txn within Kafka's ProducerStateManager,
// but ideally the user will reconnect with the same txnal id.
cl.producer.readded = true
return cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
return cl.doWithConcurrentTransactions(ctx, "AddPartitionsToTxn", func() error {
req := kmsg.NewPtrAddPartitionsToTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
Expand Down Expand Up @@ -824,7 +837,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
)

cl.producer.readded = false
err = cl.doWithConcurrentTransactions("EndTxn", func() error {
err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error {
req := kmsg.NewPtrEndTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
Expand All @@ -838,7 +851,8 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
})

// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we loaded above.
// need to fail our producer ID we loaded above. ConcurrentTransactions
// could be
var ke *kerr.Error
if errors.As(err, &ke) && !ke.Retriable {
cl.failProducerID(id, epoch, err)
Expand Down Expand Up @@ -886,13 +900,32 @@ func (cl *Client) maybeRecoverProducerID() (necessary, did bool, err error) {
// If a transaction is begun too quickly after finishing an old transaction,
// Kafka may still be finalizing its commit / abort and will return a
// concurrent transactions error. We handle that by retrying for a bit.
func (cl *Client) doWithConcurrentTransactions(name string, fn func() error) error {
func (cl *Client) doWithConcurrentTransactions(ctx context.Context, name string, fn func() error) error {
start := time.Now()
tries := 0
backoff := cl.cfg.txnBackoff

start:
err := fn()
if errors.Is(err, kerr.ConcurrentTransactions) && time.Since(start) < 5*time.Second {
if errors.Is(err, kerr.ConcurrentTransactions) {
// The longer we are stalled, the more we enforce a minimum
// backoff.
since := time.Since(start)
switch {
case since > time.Second:
if backoff < 200*time.Millisecond {
backoff = 200 * time.Millisecond
}
case since > 5*time.Second/2:
if backoff < 500*time.Millisecond {
backoff = 500 * time.Millisecond
}
case since > 5*time.Second:
if backoff < time.Second {
backoff = time.Second
}
}

tries++
cl.cfg.logger.Log(LogLevelDebug, fmt.Sprintf("%s failed with CONCURRENT_TRANSACTIONS, which may be because we ended a txn and began producing in a new txn too quickly; backing off and retrying", name),
"backoff", backoff,
Expand All @@ -901,6 +934,9 @@ start:
)
select {
case <-time.After(backoff):
case <-ctx.Done():
cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name))
return err
case <-cl.ctx.Done():
cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name))
return err
Expand Down Expand Up @@ -941,12 +977,18 @@ func (cl *Client) commitTransactionOffsets(
// unlock the producer txnMu before committing to allow EndTransaction
// to go through, even though that could cut off our commit.
cl.producer.txnMu.Lock()
var unlockedTxn bool
unlockTxn := func() {
if !unlockedTxn {
cl.producer.txnMu.Unlock()
}
unlockedTxn = true
}
defer unlockTxn()
if !cl.producer.inTxn {
onDone(nil, nil, errNotInTransaction)
cl.producer.txnMu.Unlock()
return nil
}
cl.producer.txnMu.Unlock()

g := cl.consumer.g
if g == nil {
Expand All @@ -958,9 +1000,6 @@ func (cl *Client) commitTransactionOffsets(
return g
}

g.mu.Lock()
defer g.mu.Unlock()

if !g.offsetsAddedToTxn {
if err := cl.addOffsetsToTxn(g.ctx, g.cfg.group); err != nil {
if onDone != nil {
Expand All @@ -971,6 +1010,11 @@ func (cl *Client) commitTransactionOffsets(
g.offsetsAddedToTxn = true
}

unlockTxn()

g.mu.Lock()
defer g.mu.Unlock()

g.commitTxn(ctx, uncommitted, onDone)
return g
}
Expand All @@ -984,7 +1028,7 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
return err
}

err = cl.doWithConcurrentTransactions("AddOffsetsToTxn", func() error { // committing offsets without producing causes a transaction to begin within Kafka
err = cl.doWithConcurrentTransactions(ctx, "AddOffsetsToTxn", func() error { // committing offsets without producing causes a transaction to begin within Kafka
cl.cfg.logger.Log(LogLevelInfo, "issuing AddOffsetsToTxn",
"txn", *cl.cfg.txnID,
"producerID", id,
Expand Down

0 comments on commit f4a508c

Please sign in to comment.