Skip to content

Commit

Permalink
Merge pull request #78214 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-78148

release-22.1: rangefeedcache: use unbounded buffers during initial scans
  • Loading branch information
irfansharif authored Mar 22, 2022
2 parents baf3493 + eade8fa commit 7b7ba41
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
21 changes: 16 additions & 5 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// ErrBufferLimitExceeded is returned by the buffer when attempting to add more
// events than the limit the buffer is configured with.
var ErrBufferLimitExceeded = errors.New("buffer limit exceeded")
var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded")

// Event is the unit of what can be added to the buffer.
type Event interface {
Expand All @@ -34,19 +34,20 @@ type Event interface {
// order en-masse whenever the rangefeed frontier is bumped. If we accumulate
// more events than the limit allows for, we error out to the caller.
type Buffer struct {
limit int

mu struct {
syncutil.Mutex

events
frontier hlc.Timestamp
limit int
}
}

// New constructs a Buffer with the provided limit.
func New(limit int) *Buffer {
return &Buffer{limit: limit}
b := &Buffer{}
b.mu.limit = limit
return b
}

// Add adds the given entry to the buffer.
Expand All @@ -60,7 +61,7 @@ func (b *Buffer) Add(ev Event) error {
return nil
}

if b.mu.events.Len()+1 > b.limit {
if b.mu.events.Len()+1 > b.mu.limit {
return ErrBufferLimitExceeded
}

Expand Down Expand Up @@ -93,6 +94,16 @@ func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Ev
return events
}

// SetLimit is used to limit the number of events the buffer internally tracks.
// If already in excess of the limit, future additions will error out (until the
// buffer is Flush()-ed at least).
func (b *Buffer) SetLimit(limit int) {
b.mu.Lock()
defer b.mu.Unlock()

b.mu.limit = limit
}

type events []Event

var _ sort.Interface = (*events)(nil)
Expand Down
16 changes: 14 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/stretchr/testify/require"
)

// TestBuffer ensures that buffer addition and flushing semantics work as
// expected.
// TestBuffer ensures that buffer addition, flushing and limit-setting semantics
// work as expected.
func TestBuffer(t *testing.T) {
defer leaktest.AfterTest(t)()
ts := func(nanos int) hlc.Timestamp {
Expand Down Expand Up @@ -113,6 +113,18 @@ func TestBuffer(t *testing.T) {
err := buffer.Add(makeEvent("x", ts(101)))
require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded)
}

{ // Ensure that limit changes behave as expected.
buffer.SetLimit(limit + 1)
require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101

buffer.SetLimit(limit - 1)
err := buffer.Add(makeEvent("y", ts(102))) // y@102
require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded)

events := buffer.Flush(ctx, ts(102))
require.True(t, len(events) == limit+1)
}
}

type testEvent struct {
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rangefeedcache

import (
"context"
"math"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -223,7 +224,19 @@ func (s *Watcher) Run(ctx context.Context) error {
}
defer func() { atomic.StoreInt32(&s.started, 0) }()

buffer := rangefeedbuffer.New(s.bufferSize)
// Initially construct an "unbounded" buffer. We want our initial scan to
// succeed, regardless of size. If we exceed a hard-coded limit during the
// initial scan, that's not recoverable/retryable -- a subsequent retry would
// run into the same limit. Instead, we'll forego limiting for now but
// set it below, when handling incremental updates.
//
// TODO(irfansharif): If this unbounded initial scan buffer proves worrying,
// we could re-work these interfaces to have callers use the rangefeedcache to
// keep a subset of the total table in-memory, fed by the rangefeed, and
// transparently query the backing table if the record requested is not found.
// We could also have the initial scan operate in chunks, handing off results
// to the caller incrementally, all within the "initial scan" phase.
buffer := rangefeedbuffer.New(math.MaxInt)
frontierBumpedCh, initialScanDoneCh, errCh := make(chan struct{}), make(chan struct{}), make(chan error)
mu := struct { // serializes access between the rangefeed and the main thread here
syncutil.Mutex
Expand Down Expand Up @@ -315,6 +328,9 @@ func (s *Watcher) Run(ctx context.Context) error {

case <-initialScanDoneCh:
s.handleUpdate(ctx, buffer, initialScanTS, CompleteUpdate)
// We're done with our initial scan, set a hard limit for incremental
// updates going forward.
buffer.SetLimit(s.bufferSize)

case err := <-errCh:
return err
Expand Down

0 comments on commit 7b7ba41

Please sign in to comment.