From eade8fa0c7070644f6535fe098cc6b75dbe9964d Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Sun, 20 Mar 2022 00:47:49 +0000 Subject: [PATCH] rangefeedcache: use unbounded buffers during initial scans Fixes #77687. We previously used a bounded buffer size for the rangefeed cache's initial scan. If the size of the table being scanned is larger than the configured limit, this is unrecoverable. A subsequent retry would run into the same limit. This behavior can have severe negative effects for the span configs infrastructure -- if `system.span_configurations` has a few too many rows, the kvsubscriber instantiated per-store will never be able to read from it. This PR uses an unbounded buffer during the initial scan, and bounds the buffer once we're in the incremental stage. Release justification: low risk, high benefit Release note: None --- .../rangefeed/rangefeedbuffer/buffer.go | 21 ++++++++++++++----- .../rangefeed/rangefeedbuffer/buffer_test.go | 16 ++++++++++++-- .../rangefeed/rangefeedcache/watcher.go | 18 +++++++++++++++- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go index 86045b957cc2..cdac92e28a34 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -22,7 +22,7 @@ import ( // ErrBufferLimitExceeded is returned by the buffer when attempting to add more // events than the limit the buffer is configured with. -var ErrBufferLimitExceeded = errors.New("buffer limit exceeded") +var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded") // Event is the unit of what can be added to the buffer. type Event interface { @@ -34,19 +34,20 @@ type Event interface { // order en-masse whenever the rangefeed frontier is bumped. If we accumulate // more events than the limit allows for, we error out to the caller. type Buffer struct { - limit int - mu struct { syncutil.Mutex events frontier hlc.Timestamp + limit int } } // New constructs a Buffer with the provided limit. func New(limit int) *Buffer { - return &Buffer{limit: limit} + b := &Buffer{} + b.mu.limit = limit + return b } // Add adds the given entry to the buffer. @@ -60,7 +61,7 @@ func (b *Buffer) Add(ev Event) error { return nil } - if b.mu.events.Len()+1 > b.limit { + if b.mu.events.Len()+1 > b.mu.limit { return ErrBufferLimitExceeded } @@ -93,6 +94,16 @@ func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Ev return events } +// SetLimit is used to limit the number of events the buffer internally tracks. +// If already in excess of the limit, future additions will error out (until the +// buffer is Flush()-ed at least). +func (b *Buffer) SetLimit(limit int) { + b.mu.Lock() + defer b.mu.Unlock() + + b.mu.limit = limit +} + type events []Event var _ sort.Interface = (*events)(nil) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go index 5188beae8e36..ae125c442cc3 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/require" ) -// TestBuffer ensures that buffer addition and flushing semantics work as -// expected. +// TestBuffer ensures that buffer addition, flushing and limit-setting semantics +// work as expected. func TestBuffer(t *testing.T) { defer leaktest.AfterTest(t)() ts := func(nanos int) hlc.Timestamp { @@ -113,6 +113,18 @@ func TestBuffer(t *testing.T) { err := buffer.Add(makeEvent("x", ts(101))) require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) } + + { // Ensure that limit changes behave as expected. + buffer.SetLimit(limit + 1) + require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101 + + buffer.SetLimit(limit - 1) + err := buffer.Add(makeEvent("y", ts(102))) // y@102 + require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) + + events := buffer.Flush(ctx, ts(102)) + require.True(t, len(events) == limit+1) + } } type testEvent struct { diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go index 6cee5b61c563..33e5d52f3acd 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go @@ -12,6 +12,7 @@ package rangefeedcache import ( "context" + "math" "strings" "sync/atomic" "time" @@ -223,7 +224,19 @@ func (s *Watcher) Run(ctx context.Context) error { } defer func() { atomic.StoreInt32(&s.started, 0) }() - buffer := rangefeedbuffer.New(s.bufferSize) + // Initially construct an "unbounded" buffer. We want our initial scan to + // succeed, regardless of size. If we exceed a hard-coded limit during the + // initial scan, that's not recoverable/retryable -- a subsequent retry would + // run into the same limit. Instead, we'll forego limiting for now but + // set it below, when handling incremental updates. + // + // TODO(irfansharif): If this unbounded initial scan buffer proves worrying, + // we could re-work these interfaces to have callers use the rangefeedcache to + // keep a subset of the total table in-memory, fed by the rangefeed, and + // transparently query the backing table if the record requested is not found. + // We could also have the initial scan operate in chunks, handing off results + // to the caller incrementally, all within the "initial scan" phase. + buffer := rangefeedbuffer.New(math.MaxInt) frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error) mu := struct { // serializes access between the rangefeed and the main thread here syncutil.Mutex @@ -315,6 +328,9 @@ func (s *Watcher) Run(ctx context.Context) error { case <-initialScanDoneCh: s.handleUpdate(ctx, buffer, initialScanTS, CompleteUpdate) + // We're done with our initial scan, set a hard limit for incremental + // updates going forward. + buffer.SetLimit(s.bufferSize) case err := <-errCh: return err