Skip to content

Commit

Permalink
kgo: support MaxBufferedBytes
Browse files Browse the repository at this point in the history
Closes #544.
  • Loading branch information
twmb committed Sep 20, 2023
1 parent 01651af commit 304559f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.maxRecordBatchBytes}
case namefn(MaxBufferedRecords):
return []any{cfg.maxBufferedRecords}
case namefn(MaxBufferedBytes):
return []any{cfg.maxBufferedBytes}
case namefn(RecordPartitioner):
return []any{cfg.partitioner}
case namefn(ProduceRequestTimeout):
Expand Down
23 changes: 21 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type cfg struct {
defaultProduceTopic string
maxRecordBatchBytes int32
maxBufferedRecords int64
maxBufferedBytes int64
produceTimeout time.Duration
recordRetries int64
maxUnknownFailures int64
Expand Down Expand Up @@ -293,6 +294,7 @@ func (cfg *cfg) validate() error {

// Some random producer settings.
{name: "max buffered records", v: cfg.maxBufferedRecords, allowed: 1, badcmp: i64lt},
{name: "max buffered bytes", v: cfg.maxBufferedBytes, allowed: 0, badcmp: i64lt},
{name: "linger", v: int64(cfg.linger), allowed: int64(time.Minute), badcmp: i64gt, durs: true},
{name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(100 * time.Millisecond), badcmp: i64lt, durs: true},
{name: "record timeout", v: int64(cfg.recordTimeout), allowed: int64(time.Second), badcmp: func(l, r int64) (bool, string) {
Expand Down Expand Up @@ -948,6 +950,23 @@ func MaxBufferedRecords(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.maxBufferedRecords = int64(n) }}
}

// MaxBufferedBytes sets the max amount of bytes that the client will buffer
// while producing, blocking produces until records are finished if this limit
// is reached. This overrides the unlimited default.
//
// Note that this option does _not_ apply for consuming: the client cannot
// limit bytes buffered for consuming because of decompression. You can roughly
// control consuming memory by using [MaxConcurrentFetches], [FetchMaxBytes],
// and [FetchMaxPartitionBytes].
//
// If you produce a record that is larger than n, the record is immediately
// failed with kerr.MessageTooLarge.
//
// Note that this limit applies after [MaxBufferedRecords].
func MaxBufferedBytes(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.maxBufferedBytes = int64(n) }}
}

// RecordPartitioner uses the given partitioner to partition records, overriding
// the default UniformBytesPartitioner(64KiB, true, true, nil).
func RecordPartitioner(partitioner Partitioner) ProducerOpt {
Expand Down Expand Up @@ -1047,8 +1066,8 @@ func ProducerLinger(linger time.Duration) ProducerOpt {
// ManualFlushing disables auto-flushing when producing. While you can still
// set lingering, it would be useless to do so.
//
// With manual flushing, producing while MaxBufferedRecords have already been
// produced and not flushed will return ErrMaxBuffered.
// With manual flushing, producing while MaxBufferedRecords or MaxBufferedBytes
// have already been produced and not flushed will return ErrMaxBuffered.
func ManualFlushing() ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.manualFlushing = true }}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) {
getSeedBrokers(),
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
MaxBufferedRecords(10000),
MaxBufferedBytes(50000),
UnknownTopicRetries(-1), // see txn_test comment
)
defer cl.Close()
Expand Down Expand Up @@ -124,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
ConsumeTopics(c.consumeFrom),
Balancers(c.balancer),
MaxBufferedRecords(10000),
MaxBufferedBytes(50000),
ConsumePreferringLagFn(PreferLagAt(1)),
BlockRebalanceOnPoll(),

Expand Down
42 changes: 36 additions & 6 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type producer struct {
bufferedRecords atomicI64
bufferedBytes atomicI64
inflight atomicI64 // high 16: # waiters, low 48: # inflight

cl *Client
Expand Down Expand Up @@ -313,8 +314,9 @@ func (f *FirstErrPromise) Err() error {
}

// TryProduce is similar to Produce, but rather than blocking if the client
// currently has MaxBufferedRecords buffered, this fails immediately with
// ErrMaxBuffered. See the Produce documentation for more details.
// currently has MaxBufferedRecords or MaxBufferedBytes buffered, this fails
// immediately with ErrMaxBuffered. See the Produce documentation for more
// details.
func (cl *Client) TryProduce(
ctx context.Context,
r *Record,
Expand Down Expand Up @@ -387,6 +389,21 @@ func (cl *Client) produce(
}
}

var (
userSize = r.userSize()
bufRecs = p.bufferedRecords.Add(1)
bufBytes = p.bufferedBytes.Add(userSize)
overMaxRecs = bufRecs > cl.cfg.maxBufferedRecords
overMaxBytes bool
)
if cl.cfg.maxBufferedBytes > 0 {
if userSize > cl.cfg.maxBufferedBytes {
p.promiseRecord(promisedRec{ctx, promise, r}, kerr.MessageTooLarge)
return
}
overMaxBytes = bufBytes > cl.cfg.maxBufferedBytes
}

if r.Topic == "" {
p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic)
return
Expand All @@ -396,7 +413,11 @@ func (cl *Client) produce(
return
}

if p.bufferedRecords.Add(1) > cl.cfg.maxBufferedRecords {
if overMaxRecs || overMaxBytes {
cl.cfg.logger.Log(LogLevelDebug, "blocking Produce because we are either over max buffered records or max buffered bytes",
"over_max_records", overMaxRecs,
"over_max_bytes", overMaxBytes,
)
// If the client ctx cancels or the produce ctx cancels, we
// need to un-count our buffering of this record. We also need
// to drain a slot from the waitBuffer chan, which could be
Expand All @@ -411,11 +432,14 @@ func (cl *Client) produce(
}
select {
case <-p.waitBuffer:
cl.cfg.logger.Log(LogLevelDebug, "Produce block signaled, continuing to produce")
case <-cl.ctx.Done():
drainBuffered(ErrClientClosed)
cl.cfg.logger.Log(LogLevelDebug, "client ctx canceled while blocked in Produce, returning")
return
case <-ctx.Done():
drainBuffered(ctx.Err())
cl.cfg.logger.Log(LogLevelDebug, "produce ctx canceled while blocked in Produce, returning")
return
}
}
Expand Down Expand Up @@ -478,15 +502,21 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
}
}

// Capture user size before potential modification by the promise.
userSize := pr.userSize()
nowBufBytes := p.bufferedBytes.Add(-userSize)
nowBufRecs := p.bufferedRecords.Add(-1)
wasOverMaxRecs := nowBufRecs >= cl.cfg.maxBufferedRecords
wasOverMaxBytes := cl.cfg.maxBufferedBytes > 0 && nowBufBytes+userSize > cl.cfg.maxBufferedBytes

// We call the promise before finishing the record; this allows users
// of Flush to know that all buffered records are completely done
// before Flush returns.
pr.promise(pr.Record, err)

buffered := p.bufferedRecords.Add(-1)
if buffered >= cl.cfg.maxBufferedRecords {
if wasOverMaxRecs || wasOverMaxBytes {
p.waitBuffer <- struct{}{}
} else if buffered == 0 && p.flushing.Load() > 0 {
} else if nowBufRecs == 0 && p.flushing.Load() > 0 {
p.mu.Lock()
p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.c.Broadcast()
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ type Record struct {
Context context.Context
}

func (r *Record) userSize() int64 {
s := len(r.Key) + len(r.Value)
for _, h := range r.Headers {
s += len(h.Key) + len(h.Value)
}
return int64(s)
}

// When buffering records, we calculate the length and tsDelta ahead of time
// (also because number width affects encoding length). We repurpose the Offset
// field to save space.
Expand Down

0 comments on commit 304559f

Please sign in to comment.