Skip to content

Commit

Permalink
kvserver/rangefeed: move CatchUpScan() params to constructor
Browse files Browse the repository at this point in the history
`CatchUpIterator` took keyspan and start time parameters both in the
constructor and when calling `CatchUpScan()`. This wasn't safe, because
the iterator could have been constructed with bounds that would not
satisfy the parameters passed to the scan -- for example, if passing a
wider key span or lower start time -- in which case the scan would omit
values.

This patch removes the keyspan and start time parameters to
`CatchUpScan()`, such that the caller must provide them during
construction

Release note: None
  • Loading branch information
erikgrinaker committed Jun 6, 2022
1 parent d92c0ca commit 03c63d5
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 62 deletions.
43 changes: 21 additions & 22 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
"github.com/cockroachdb/errors"
)

// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
simpleCatchupIter
close func()
}

// simpleCatchupIter is an extension of SimpleMVCCIterator that allows for the
// primary iterator to be implemented using a regular MVCCIterator or a
// (often) more efficient MVCCIncrementalIterator. When the caller wants to
Expand All @@ -52,16 +46,25 @@ func (i simpleCatchupIterAdapter) NextIgnoringTime() {

var _ simpleCatchupIter = simpleCatchupIterAdapter{}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
// CatchUpIterator is an iterator for catchup-scans.
type CatchUpIterator struct {
simpleCatchupIter
close func()
span roachpb.Span
startTime hlc.Timestamp // exclusive
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader over the
// given key/time span. startTime is exclusive.
func NewCatchUpIterator(
reader storage.Reader, args *roachpb.RangeFeedRequest, closer func(),
reader storage.Reader, span roachpb.Span, startTime hlc.Timestamp, closer func(),
) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
StartTime: args.Timestamp,
EndKey: span.EndKey,
StartTime: startTime,
EndTime: hlc.MaxTimestamp,
// We want to emit intents rather than error
// (the default behavior) so that we can skip
Expand All @@ -75,7 +78,9 @@ func NewCatchUpIterator(
// still needed (#69357).
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
}),
close: closer,
close: closer,
span: span,
startTime: startTime,
}
}

Expand All @@ -93,15 +98,9 @@ func (i *CatchUpIterator) Close() {
// returns. However, we may revist this in #69596.
type outputEventFn func(e *roachpb.RangeFeedEvent) error

// CatchUpScan iterates over all changes for the given span of keys,
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn. catchUpTimestamp is exclusive.
func (i *CatchUpIterator) CatchUpScan(
startKey, endKey storage.MVCCKey,
catchUpTimestamp hlc.Timestamp,
withDiff bool,
outputFn outputEventFn,
) error {
// CatchUpScan iterates over all changes in the configured key/time span, and
// emits them as RangeFeedEvents via outputFn in chronological order.
func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error {
var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
Expand Down Expand Up @@ -136,7 +135,7 @@ func (i *CatchUpIterator) CatchUpScan(
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
i.SeekGE(startKey)
i.SeekGE(storage.MVCCKey{Key: i.span.Key})
for {
if ok, err := i.Valid(); err != nil {
return err
Expand Down Expand Up @@ -195,7 +194,7 @@ func (i *CatchUpIterator) CatchUpScan(
// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ts := unsafeKey.Timestamp
ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts))
ignore := !(ts.IsEmpty() || i.startTime.Less(ts))
if ignore && !withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,13 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
func() {
iter := rangefeed.NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: opts.ts,
},
WithDiff: opts.withDiff,
Span: span,
}, func() {})
iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil)
defer iter.Close()
counter := 0
err := iter.CatchUpScan(storage.MakeMVCCMetadataKey(startKey), storage.MakeMVCCMetadataKey(endKey), opts.ts, opts.withDiff, func(*roachpb.RangeFeedEvent) error {
err := iter.CatchUpScan(func(*roachpb.RangeFeedEvent) error {
counter++
return nil
})
}, opts.withDiff)
if err != nil {
b.Fatalf("failed catchUp scan: %+v", err)
}
Expand Down
21 changes: 6 additions & 15 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,15 @@ func TestCatchupScan(t *testing.T) {
t.Fatal(err)
}
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: ts1, // exclusive
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, nil)
span := roachpb.Span{Key: testKey1, EndKey: roachpb.KeyMax}
iter := NewCatchUpIterator(eng, span, ts1, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}, withDiff))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,11 @@ func NewProcessor(cfg Config) *Processor {
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that
// can be used for catchup-scans. It should be called from underneath
// a stopper task to ensure that the engine has not been closed.
//
// The constructed iterator must have an UpperBound set.
type CatchUpIteratorConstructor func() *CatchUpIterator
// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) *CatchUpIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -371,10 +370,7 @@ func (r *registration) maybeRunCatchUpScan() error {
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

startKey := storage.MakeMVCCMetadataKey(r.span.Key)
endKey := storage.MakeMVCCMetadataKey(r.span.EndKey)

return catchUpIter.CatchUpScan(startKey, endKey, r.catchUpTimestamp, r.withDiff, r.stream.Send)
return catchUpIter.CatchUpScan(r.stream.Send, r.withDiff)
}

// ID implements interval.Interface.
Expand Down Expand Up @@ -560,7 +556,7 @@ func (r *registration) maybeConstructCatchUpIter() {
return
}

catchUpIter := r.catchUpIterConstructor()
catchUpIter := r.catchUpIterConstructor(r.span, r.catchUpTimestamp)
r.catchUpIterConstructor = nil

r.mu.Lock()
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIter
if iter == nil {
return nil
}
return func() *CatchUpIterator {
return &CatchUpIterator{simpleCatchupIter: simpleCatchupIterAdapter{iter}}
return func(span roachpb.Span, startTime hlc.Timestamp) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,11 @@ func (r *Replica) rangeFeedWithRangeID(
// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.CatchUpIteratorConstructor
if usingCatchUpIter {
catchUpIterFunc = func() *rangefeed.CatchUpIterator {
catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) *rangefeed.CatchUpIterator {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
return rangefeed.NewCatchUpIterator(r.Engine(), args, iterSemRelease)
return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease)
}
}
p := r.registerWithRangefeedRaftMuLocked(
Expand Down

0 comments on commit 03c63d5

Please sign in to comment.