Skip to content

Commit

Permalink
Merge pull request #227 from ladislavmacoun/fix-sequence-overflow
Browse files Browse the repository at this point in the history
sink: fix sequence number overflow during draining
  • Loading branch information
twmb authored Oct 19, 2022
2 parents 9387634 + 5a0f531 commit b2aec9c
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
recBuf.inflight++

recBuf.batchDrainIdx++
if recBuf.seq > math.MaxInt32-int32(len(batch.records)) {
recBuf.seq = int32(len(batch.records)) - (math.MaxInt32 - recBuf.seq) - 1
} else {
recBuf.seq += int32(len(batch.records))
}
recBuf.seq = incrementSequence(recBuf.seq, int32(len(batch.records)))
moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining()
recBuf.mu.Unlock()

Expand All @@ -142,6 +138,14 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
return req, txnBuilder.req, moreToDrain
}

func incrementSequence(sequence, increment int32) int32 {
if sequence > math.MaxInt32-increment {
return increment - (math.MaxInt32 - sequence) - 1
}

return sequence + increment
}

type txnReqBuilder struct {
txnID *string
req *kmsg.AddPartitionsToTxnRequest
Expand Down Expand Up @@ -864,7 +868,7 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
// We know the batch made it to Kafka successfully without error.
// We remove this batch and finish all records appropriately.
finished := len(batch.records)
recBuf.batch0Seq += int32(finished)
recBuf.batch0Seq = incrementSequence(recBuf.batch0Seq, int32(finished))
atomic.AddInt64(&recBuf.buffered, -int64(finished))
recBuf.batches[0] = nil
recBuf.batches = recBuf.batches[1:]
Expand Down

0 comments on commit b2aec9c

Please sign in to comment.