diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index c08a3d35..312a7205 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -123,11 +123,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti recBuf.inflight++ recBuf.batchDrainIdx++ - if recBuf.seq > math.MaxInt32-int32(len(batch.records)) { - recBuf.seq = int32(len(batch.records)) - (math.MaxInt32 - recBuf.seq) - 1 - } else { - recBuf.seq += int32(len(batch.records)) - } + recBuf.seq = incrementSequence(recBuf.seq, int32(len(batch.records))) moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining() recBuf.mu.Unlock() @@ -142,6 +138,14 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti return req, txnBuilder.req, moreToDrain } +func incrementSequence(sequence, increment int32) int32 { + if sequence > math.MaxInt32-increment { + return increment - (math.MaxInt32 - sequence) - 1 + } + + return sequence + increment +} + type txnReqBuilder struct { txnID *string req *kmsg.AddPartitionsToTxnRequest @@ -864,7 +868,7 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i // We know the batch made it to Kafka successfully without error. // We remove this batch and finish all records appropriately. finished := len(batch.records) - recBuf.batch0Seq += int32(finished) + recBuf.batch0Seq = incrementSequence(recBuf.batch0Seq, int32(finished)) atomic.AddInt64(&recBuf.buffered, -int64(finished)) recBuf.batches[0] = nil recBuf.batches = recBuf.batches[1:]