Skip to content

Commit

Permalink
kgo sink: only allow more than one in flight if we have ok response
Browse files Browse the repository at this point in the history
See large comment.

For #223.
  • Loading branch information
twmb committed Oct 18, 2022
1 parent ac130a1 commit bc8c495
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
recBufsIdx = (recBufsIdx + 1) % len(s.recBufs)

recBuf.mu.Lock()
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s {
if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s || recBuf.inflight != 0 && !recBuf.okOnSink {
recBuf.mu.Unlock()
continue
}
Expand Down Expand Up @@ -828,6 +828,9 @@ func (s *sink) handleReqRespBatch(
"err_is_retriable", kerr.IsRetriable(err),
"max_retries_reached", !failUnknown && batch.tries >= s.cl.cfg.recordRetries,
)
batch.owner.okOnSink = false
} else {
batch.owner.okOnSink = true
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err)
didProduce = err == nil
Expand Down Expand Up @@ -1045,6 +1048,20 @@ type recBuf struct {
// finishing, we would allow requests to finish out of order:
// handleSeqResps works per sink, not across sinks.
inflightOnSink *sink
// We only want to allow more than 1 inflight on a sink *if* we are
// currently receiving successful responses. Unimportantly, this allows
// us to save resources if the broker is having a problem or just
// recovered from one. Importantly, we work around an edge case in
// Kafka. Kafka will accept the first produce request for a pid/epoch
// with *any* sequence number. Say we sent two requests inflight. The
// first request Kafka replies to with NOT_LEADER_FOR_PARTITION, the
// second, the broker finished setting up and accepts. The broker now
// has the second request but not the first, we will retry both
// requests and receive OOOSN, and the broker has logs out of order.
// By only allowing more than one inflight if we have seen an ok
// response, we largely eliminate risk of this problem. See #223 for
// more details.
okOnSink bool
// Inflight tracks the number of requests inflight using batches from
// this recBuf. Every time this hits zero, if the batchDrainIdx is not
// at the end, we clear inflightOnSink and trigger the *current* sink
Expand Down Expand Up @@ -1463,9 +1480,8 @@ func (b *recBatch) decInflight() {
if recBuf.inflight != 0 {
return
}
oldSink := recBuf.inflightOnSink
recBuf.inflightOnSink = nil
if oldSink != recBuf.sink && recBuf.batchDrainIdx != len(recBuf.batches) {
if recBuf.batchDrainIdx != len(recBuf.batches) {
recBuf.sink.maybeDrain()
}
}
Expand Down

0 comments on commit bc8c495

Please sign in to comment.