Skip to content

Commit

Permalink
kgo txn: handle UNKNOWN_SERVER_ERROR more widely
Browse files Browse the repository at this point in the history
This error is technically not retryable, but in the context of
transactions, at worst, we will eventually see a better more direct
error.

Redpanda returns this error a bit right now (although this is being
reduced with 22.3), but we may as well be more resilient anyway.
  • Loading branch information
twmb committed Oct 31, 2022
1 parent eb6e3b5 commit 3ecaff2
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,29 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
kip447 := false
if wantCommit && !failed {
isAbortableCommitErr := func(err error) bool {
// ILLEGAL_GENERATION: rebalance began and completed
// before we committed.
//
// REBALANCE_IN_PREGRESS: rebalance began, abort.
//
// COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_LOAD_IN_PROGRESS,
// NOT_COORDINATOR: request failed too many times
//
// CONCURRENT_TRANSACTIONS: Kafka not harmonized,
// we can just abort.
//
// UNKNOWN_SERVER_ERROR: technically should not happen,
// but we can just abort. Redpanda returns this in
// certain versions.
switch {
case errors.Is(err, kerr.IllegalGeneration), // rebalance begun & completed before we committed
errors.Is(err, kerr.RebalanceInProgress), // in rebalance, abort & retry later
errors.Is(err, kerr.CoordinatorNotAvailable), // req failed too many times (same for next two)
case errors.Is(err, kerr.IllegalGeneration),
errors.Is(err, kerr.RebalanceInProgress),
errors.Is(err, kerr.CoordinatorNotAvailable),
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions): // kafka not harmonized, we can just abort or retry / eventually abort
errors.Is(err, kerr.ConcurrentTransactions),
errors.Is(err, kerr.UnknownServerError):
return true
}
return false
Expand Down Expand Up @@ -376,14 +392,34 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
"will_try_commit", willTryCommit,
)

retried := false // just in case, we use this to avoid looping
retryUnattempted:
// We have a few potential retryable errors from EndTransaction.
// OperationNotAttempted will be returned at most once.
//
// UnknownServerError should not be returned, but some brokers do:
// technically this is fatal, but there is no downside to retrying
// (even retrying a commit) and seeing if we are successful or if we
// get a better error.
var tries int
retry:
endTxnErr := s.cl.EndTransaction(ctx, TransactionEndTry(willTryCommit))
if errors.Is(endTxnErr, kerr.OperationNotAttempted) && !retried {
willTryCommit = false
retried = true
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort")
goto retryUnattempted
tries++
if endTxnErr != nil && tries < 10 {
switch {
case errors.Is(endTxnErr, kerr.OperationNotAttempted):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort")
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.UnknownServerError):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
select {
case <-after.C: // context canceled; we will see when we retry
case <-s.cl.ctx.Done():
after.Stop()
}
goto retry
}
}

if !willTryCommit || endTxnErr != nil {
Expand Down Expand Up @@ -856,10 +892,15 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
})

// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we loaded above. ConcurrentTransactions
// could be
// need to fail our producer ID we loaded above.
//
// UNKNOWN_SERVER_ERROR can theoretically be returned (not all brokers
// do). This technically is fatal, but we do not really know whether it
// is. We can just return this error and let the caller decide to
// continue, if the caller does continue, we will try something and
// eventually then receive our proper transactional error, if any.
var ke *kerr.Error
if errors.As(err, &ke) && !ke.Retriable {
if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code {
cl.failProducerID(id, epoch, err)
}

Expand Down Expand Up @@ -1054,8 +1095,14 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {

// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we created just above.
//
// We special case UNKNOWN_SERVER_ERROR, because we do not really know
// if this is fatal. If it is, we will catch it later on a better
// error. Some brokers send this when things fail internally, we can
// just abort our commit and see if things are still bad in
// EndTransaction.
var ke *kerr.Error
if errors.As(err, &ke) && !ke.Retriable {
if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code {
cl.failProducerID(id, epoch, err)
}

Expand Down

0 comments on commit 3ecaff2

Please sign in to comment.