diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 48f9d692..249af071 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -261,13 +261,29 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry kip447 := false if wantCommit && !failed { isAbortableCommitErr := func(err error) bool { + // ILLEGAL_GENERATION: rebalance began and completed + // before we committed. + // + // REBALANCE_IN_PREGRESS: rebalance began, abort. + // + // COORDINATOR_NOT_AVAILABLE, + // COORDINATOR_LOAD_IN_PROGRESS, + // NOT_COORDINATOR: request failed too many times + // + // CONCURRENT_TRANSACTIONS: Kafka not harmonized, + // we can just abort. + // + // UNKNOWN_SERVER_ERROR: technically should not happen, + // but we can just abort. Redpanda returns this in + // certain versions. switch { - case errors.Is(err, kerr.IllegalGeneration), // rebalance begun & completed before we committed - errors.Is(err, kerr.RebalanceInProgress), // in rebalance, abort & retry later - errors.Is(err, kerr.CoordinatorNotAvailable), // req failed too many times (same for next two) + case errors.Is(err, kerr.IllegalGeneration), + errors.Is(err, kerr.RebalanceInProgress), + errors.Is(err, kerr.CoordinatorNotAvailable), errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator), - errors.Is(err, kerr.ConcurrentTransactions): // kafka not harmonized, we can just abort or retry / eventually abort + errors.Is(err, kerr.ConcurrentTransactions), + errors.Is(err, kerr.UnknownServerError): return true } return false @@ -376,14 +392,34 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry "will_try_commit", willTryCommit, ) - retried := false // just in case, we use this to avoid looping -retryUnattempted: + // We have a few potential retryable errors from EndTransaction. + // OperationNotAttempted will be returned at most once. + // + // UnknownServerError should not be returned, but some brokers do: + // technically this is fatal, but there is no downside to retrying + // (even retrying a commit) and seeing if we are successful or if we + // get a better error. + var tries int +retry: endTxnErr := s.cl.EndTransaction(ctx, TransactionEndTry(willTryCommit)) - if errors.Is(endTxnErr, kerr.OperationNotAttempted) && !retried { - willTryCommit = false - retried = true - s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort") - goto retryUnattempted + tries++ + if endTxnErr != nil && tries < 10 { + switch { + case errors.Is(endTxnErr, kerr.OperationNotAttempted): + s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort") + willTryCommit = false + goto retry + + case errors.Is(endTxnErr, kerr.UnknownServerError): + s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying") + after := time.NewTimer(s.cl.cfg.retryBackoff(tries)) + select { + case <-after.C: // context canceled; we will see when we retry + case <-s.cl.ctx.Done(): + after.Stop() + } + goto retry + } } if !willTryCommit || endTxnErr != nil { @@ -856,10 +892,15 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) }) // If the returned error is still a Kafka error, this is fatal and we - // need to fail our producer ID we loaded above. ConcurrentTransactions - // could be + // need to fail our producer ID we loaded above. + // + // UNKNOWN_SERVER_ERROR can theoretically be returned (not all brokers + // do). This technically is fatal, but we do not really know whether it + // is. We can just return this error and let the caller decide to + // continue, if the caller does continue, we will try something and + // eventually then receive our proper transactional error, if any. var ke *kerr.Error - if errors.As(err, &ke) && !ke.Retriable { + if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code { cl.failProducerID(id, epoch, err) } @@ -1054,8 +1095,14 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error { // If the returned error is still a Kafka error, this is fatal and we // need to fail our producer ID we created just above. + // + // We special case UNKNOWN_SERVER_ERROR, because we do not really know + // if this is fatal. If it is, we will catch it later on a better + // error. Some brokers send this when things fail internally, we can + // just abort our commit and see if things are still bad in + // EndTransaction. var ke *kerr.Error - if errors.As(err, &ke) && !ke.Retriable { + if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code { cl.failProducerID(id, epoch, err) }