Skip to content

Commit

Permalink
fix(producer): treat ErrKafkaStorageError as retriable (#2939)
Browse files Browse the repository at this point in the history
This is retriable according to the spec: https://kafka.apache.org/protocol.html

Signed-off-by: Richard Artoul <[email protected]>
  • Loading branch information
richardartoul authored Aug 7, 2024
1 parent 9ded629 commit 3aea989
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.returnErrors(pSet.msgs, block.Err)
Expand Down Expand Up @@ -1134,7 +1134,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
Expand Down

0 comments on commit 3aea989

Please sign in to comment.