From 2e708532bf0055eb816a5635366c2821ca97ef75 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 30 Aug 2021 17:17:20 -0400 Subject: [PATCH 1/2] kv/kvserver/rangefeed: fix catchupIter construction synchronization The catchupIter could have been constructed after the state of the underlying store has changed. In general this doesn't seem like a disaster unless the range has been removed or the gc threshold has changed and data is gone. So, maybe it is a disaster. Release justification: fixes for high-priority or high-severity bugs in existing functionality Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 5 ++ pkg/kv/kvserver/rangefeed/registry.go | 54 +++++++++++++++++++--- pkg/kv/kvserver/rangefeed/registry_test.go | 26 ++++++----- pkg/kv/kvserver/replica_rangefeed.go | 9 +++- 4 files changed, 74 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index bc1f835116ed..dbc82d44c5c1 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -247,6 +247,11 @@ func (p *Processor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } + // Construct the catchupIter before notifying the registration that it + // has been registered. Note that if the catchUpScan is never run, then + // the iterator constructed here will be closed in disconnect. + r.maybeConstructCatchUpIter() + // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index db5c710f8c4e..9ecb362d3192 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -53,11 +53,19 @@ type Stream interface { // has finished. type registration struct { // Input. - span roachpb.Span - catchupTimestamp hlc.Timestamp + span roachpb.Span + catchupTimestamp hlc.Timestamp + withDiff bool + metrics *Metrics + + // catchupIterConstructor is used to construct the catchupIter if necessary. + // The reason this constructor is plumbed down is to make sure that the + // iterator does not get constructed too late in server shutdown. However, + // it must also be stored in the struct to ensure that it is not constructed + // too late, after the raftMu has been dropped. Thus, this function, if + // non-nil, will be used to populate mu.catchupIter while the registration + // is being registered by the processor. catchupIterConstructor func() storage.SimpleMVCCIterator - withDiff bool - metrics *Metrics // Output. stream Stream @@ -80,6 +88,11 @@ type registration struct { // Management of the output loop goroutine, used to ensure proper teardown. outputLoopCancelFn func() disconnected bool + + // catchupIter is populated on the Processor's goroutine while the + // Replica.raftMu is still held. If it is non-nil at the time that + // disconnect is called, it is closed by disconnect. + catchupIter storage.SimpleMVCCIterator } } @@ -210,6 +223,10 @@ func (r *registration) disconnect(pErr *roachpb.Error) { r.mu.Lock() defer r.mu.Unlock() if !r.mu.disconnected { + if r.mu.catchupIter != nil { + r.mu.catchupIter.Close() + r.mu.catchupIter = nil + } if r.mu.outputLoopCancelFn != nil { r.mu.outputLoopCancelFn() } @@ -285,11 +302,10 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. func (r *registration) maybeRunCatchupScan() error { - if r.catchupIterConstructor == nil { + catchupIter := r.detachCatchUpIter() + if catchupIter == nil { return nil } - catchupIter := r.catchupIterConstructor() - r.catchupIterConstructor = nil start := timeutil.Now() defer func() { catchupIter.Close() @@ -587,6 +603,30 @@ func (r *registration) waitForCaughtUp() error { return errors.Errorf("registration %v failed to empty in time", r.Range()) } +// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches +// the catchUpIter to be detached in the catchUpScan or closed on disconnect. +func (r *registration) maybeConstructCatchUpIter() { + if r.catchupIterConstructor == nil { + return + } + + ci := r.catchupIterConstructor() + r.catchupIterConstructor = nil + + r.mu.Lock() + defer r.mu.Unlock() + r.mu.catchupIter = ci +} + +// detachCatchUpIter detaches the catchupIter that was previously attached. +func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator { + r.mu.Lock() + defer r.mu.Unlock() + catchupIter := r.mu.catchupIter + r.mu.catchupIter = nil + return catchupIter +} + // waitForCaughtUp waits for all registrations overlapping the given span to // completely process their internal buffers. func (reg *registry) waitForCaughtUp(span roachpb.Span) error { diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f6db57ec6213..dcb3d35b20c4 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,19 +100,21 @@ func newTestRegistration( ) *testRegistration { s := newTestStream() errC := make(chan *roachpb.Error, 1) + r := newRegistration( + span, + ts, + makeIteratorConstructor(catchup), + withDiff, + 5, + NewMetrics(), + s, + errC, + ) + r.maybeConstructCatchUpIter() return &testRegistration{ - registration: newRegistration( - span, - ts, - makeIteratorConstructor(catchup), - withDiff, - 5, - NewMetrics(), - s, - errC, - ), - stream: s, - errC: errC, + registration: r, + stream: s, + errC: errC, } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 85b9f6ac4beb..6d6667b4e458 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -225,7 +225,9 @@ func (r *Replica) rangeFeedWithRangeID( var catchUpIterFunc rangefeed.IteratorConstructor if usingCatchupIter { catchUpIterFunc = func() storage.SimpleMVCCIterator { - + // Assert that we still hold the raftMu when this is called to ensure + // that the catchUpIter reads from the current snapshot. + r.raftMu.AssertHeld() innerIter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: args.Span.EndKey, // RangeFeed originally intended to use the time-bound iterator @@ -373,6 +375,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Start it with an iterator to initialize the resolved timestamp. rtsIter := func() storage.SimpleMVCCIterator { + // Assert that we still hold the raftMu when this is called to ensure + // that the rtsIter reads from the current snapshot. The replica + // synchronizes with the rangefeed Processor calling this function by + // waiting for the Register call below to return. + r.raftMu.AssertHeld() return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: desc.EndKey.AsRawKey(), // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed From 9ff249eec5f46c074aec72263ebc37faa9c16af6 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 30 Aug 2021 17:57:45 -0400 Subject: [PATCH 2/2] kv/kvserver: catchup -> catchUp Release justification: non-production code change Release note: None --- pkg/kv/kvserver/rangefeed/metrics.go | 6 +- pkg/kv/kvserver/rangefeed/processor.go | 6 +- pkg/kv/kvserver/rangefeed/registry.go | 84 +++++++++++----------- pkg/kv/kvserver/rangefeed/registry_test.go | 10 +-- pkg/kv/kvserver/replica_rangefeed.go | 20 +++--- 5 files changed, 63 insertions(+), 63 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go index 33574d7271ba..e299bb6bd45f 100644 --- a/pkg/kv/kvserver/rangefeed/metrics.go +++ b/pkg/kv/kvserver/rangefeed/metrics.go @@ -19,7 +19,7 @@ import ( ) var ( - metaRangeFeedCatchupScanNanos = metric.Metadata{ + metaRangeFeedCatchUpScanNanos = metric.Metadata{ Name: "kv.rangefeed.catchup_scan_nanos", Help: "Time spent in RangeFeed catchup scan", Measurement: "Nanoseconds", @@ -29,7 +29,7 @@ var ( // Metrics are for production monitoring of RangeFeeds. type Metrics struct { - RangeFeedCatchupScanNanos *metric.Counter + RangeFeedCatchUpScanNanos *metric.Counter RangeFeedSlowClosedTimestampLogN log.EveryN RangeFeedSlowClosedTimestampNudge singleflight.Group @@ -46,7 +46,7 @@ func (*Metrics) MetricStruct() {} // NewMetrics makes the metrics for RangeFeeds monitoring. func NewMetrics() *Metrics { return &Metrics{ - RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos), + RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos), RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second), RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024), } diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index dbc82d44c5c1..caf666ee0060 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -247,7 +247,7 @@ func (p *Processor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchupIter before notifying the registration that it + // Construct the catchUpIter before notifying the registration that it // has been registered. Note that if the catchUpScan is never run, then // the iterator constructed here will be closed in disconnect. r.maybeConstructCatchUpIter() @@ -399,7 +399,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) { func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchupIterConstructor IteratorConstructor, + catchUpIterConstructor IteratorConstructor, withDiff bool, stream Stream, errC chan<- *roachpb.Error, @@ -410,7 +410,7 @@ func (p *Processor) Register( p.syncEventC() r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC, ) select { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 9ecb362d3192..f70f7cc60c7c 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -54,18 +54,18 @@ type Stream interface { type registration struct { // Input. span roachpb.Span - catchupTimestamp hlc.Timestamp + catchUpTimestamp hlc.Timestamp withDiff bool metrics *Metrics - // catchupIterConstructor is used to construct the catchupIter if necessary. + // catchUpIterConstructor is used to construct the catchUpIter if necessary. // The reason this constructor is plumbed down is to make sure that the // iterator does not get constructed too late in server shutdown. However, // it must also be stored in the struct to ensure that it is not constructed // too late, after the raftMu has been dropped. Thus, this function, if - // non-nil, will be used to populate mu.catchupIter while the registration + // non-nil, will be used to populate mu.catchUpIter while the registration // is being registered by the processor. - catchupIterConstructor func() storage.SimpleMVCCIterator + catchUpIterConstructor func() storage.SimpleMVCCIterator // Output. stream Stream @@ -89,17 +89,17 @@ type registration struct { outputLoopCancelFn func() disconnected bool - // catchupIter is populated on the Processor's goroutine while the + // catchUpIter is populated on the Processor's goroutine while the // Replica.raftMu is still held. If it is non-nil at the time that // disconnect is called, it is closed by disconnect. - catchupIter storage.SimpleMVCCIterator + catchUpIter storage.SimpleMVCCIterator } } func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchupIterConstructor func() storage.SimpleMVCCIterator, + catchUpIterConstructor func() storage.SimpleMVCCIterator, withDiff bool, bufferSz int, metrics *Metrics, @@ -108,8 +108,8 @@ func newRegistration( ) registration { r := registration{ span: span, - catchupTimestamp: startTS, - catchupIterConstructor: catchupIterConstructor, + catchUpTimestamp: startTS, + catchUpIterConstructor: catchUpIterConstructor, withDiff: withDiff, metrics: metrics, stream: stream, @@ -123,7 +123,7 @@ func newRegistration( // publish attempts to send a single event to the output buffer for this // registration. If the output buffer is full, the overflowed flag is set, -// indicating that live events were lost and a catchup scan should be initiated. +// indicating that live events were lost and a catch-up scan should be initiated. // If overflowed is already set, events are ignored and not written to the // buffer. func (r *registration) publish(event *roachpb.RangeFeedEvent) { @@ -223,9 +223,9 @@ func (r *registration) disconnect(pErr *roachpb.Error) { r.mu.Lock() defer r.mu.Unlock() if !r.mu.disconnected { - if r.mu.catchupIter != nil { - r.mu.catchupIter.Close() - r.mu.catchupIter = nil + if r.mu.catchUpIter != nil { + r.mu.catchUpIter.Close() + r.mu.catchUpIter = nil } if r.mu.outputLoopCancelFn != nil { r.mu.outputLoopCancelFn() @@ -249,7 +249,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) { // have been emitted. func (r *registration) outputLoop(ctx context.Context) error { // If the registration has a catch-up scan, run it. - if err := r.maybeRunCatchupScan(); err != nil { + if err := r.maybeRunCatchUpScan(); err != nil { err = errors.Wrap(err, "catch-up scan failed") log.Errorf(ctx, "%v", err) return err @@ -294,22 +294,22 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran r.disconnect(roachpb.NewError(err)) } -// maybeRunCatchupScan starts a catchup scan which will output entries for all -// recorded changes in the replica that are newer than the catchupTimestamp. +// maybeRunCatchUpScan starts a catch-up scan which will output entries for all +// recorded changes in the replica that are newer than the catchUpTimestamp. // This uses the iterator provided when the registration was originally created; // after the scan completes, the iterator will be closed. // // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. -func (r *registration) maybeRunCatchupScan() error { - catchupIter := r.detachCatchUpIter() - if catchupIter == nil { +func (r *registration) maybeRunCatchUpScan() error { + catchUpIter := r.detachCatchUpIter() + if catchUpIter == nil { return nil } start := timeutil.Now() defer func() { - catchupIter.Close() - r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds()) + catchUpIter.Close() + r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() var a bufalloc.ByteAllocator @@ -346,23 +346,23 @@ func (r *registration) maybeRunCatchupScan() error { // versions of each key that are after the registration's startTS, so we // can't use NextKey. var meta enginepb.MVCCMetadata - catchupIter.SeekGE(startKey) + catchUpIter.SeekGE(startKey) for { - if ok, err := catchupIter.Valid(); err != nil { + if ok, err := catchUpIter.Valid(); err != nil { return err - } else if !ok || !catchupIter.UnsafeKey().Less(endKey) { + } else if !ok || !catchUpIter.UnsafeKey().Less(endKey) { break } - unsafeKey := catchupIter.UnsafeKey() - unsafeVal := catchupIter.UnsafeValue() + unsafeKey := catchUpIter.UnsafeKey() + unsafeVal := catchUpIter.UnsafeValue() if !unsafeKey.IsValue() { // Found a metadata key. if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) } if !meta.IsInline() { - // This is an MVCCMetadata key for an intent. The catchup scan + // This is an MVCCMetadata key for an intent. The catch-up scan // only cares about committed values, so ignore this and skip // past the corresponding provisional key-value. To do this, // scan to the timestamp immediately before (i.e. the key @@ -371,7 +371,7 @@ func (r *registration) maybeRunCatchupScan() error { // Make a copy since should not pass an unsafe key from the iterator // that provided it, when asking it to seek. a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0) - catchupIter.SeekGE(storage.MVCCKey{ + catchUpIter.SeekGE(storage.MVCCKey{ Key: unsafeKey.Key, Timestamp: meta.Timestamp.ToTimestamp().Prev(), }) @@ -398,11 +398,11 @@ func (r *registration) maybeRunCatchupScan() error { // Ignore the version if it's not inline and its timestamp is at // or before the registration's (exclusive) starting timestamp. - ignore := !(ts.IsEmpty() || r.catchupTimestamp.Less(ts)) + ignore := !(ts.IsEmpty() || r.catchUpTimestamp.Less(ts)) if ignore && !r.withDiff { // Skip all the way to the next key. // NB: fast-path to avoid value copy when !r.withDiff. - catchupIter.NextKey() + catchUpIter.NextKey() continue } @@ -415,10 +415,10 @@ func (r *registration) maybeRunCatchupScan() error { if ignore { // Skip all the way to the next key. - catchupIter.NextKey() + catchUpIter.NextKey() } else { // Move to the next version of this key. - catchupIter.Next() + catchUpIter.Next() var event roachpb.RangeFeedEvent event.MustSetValue(&roachpb.RangeFeedValue{ @@ -447,7 +447,7 @@ func (r *registration) Range() interval.Range { } func (r registration) String() string { - return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp) + return fmt.Sprintf("[%s @ %s+]", r.span, r.catchUpTimestamp) } // registry holds a set of registrations and manages their lifecycle. @@ -512,7 +512,7 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { // Don't publish events if they are equal to or less // than the registration's starting timestamp. - if r.catchupTimestamp.Less(minTS) { + if r.catchUpTimestamp.Less(minTS) { r.publish(event) } return false, nil @@ -606,25 +606,25 @@ func (r *registration) waitForCaughtUp() error { // maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches // the catchUpIter to be detached in the catchUpScan or closed on disconnect. func (r *registration) maybeConstructCatchUpIter() { - if r.catchupIterConstructor == nil { + if r.catchUpIterConstructor == nil { return } - ci := r.catchupIterConstructor() - r.catchupIterConstructor = nil + catchUpIter := r.catchUpIterConstructor() + r.catchUpIterConstructor = nil r.mu.Lock() defer r.mu.Unlock() - r.mu.catchupIter = ci + r.mu.catchUpIter = catchUpIter } -// detachCatchUpIter detaches the catchupIter that was previously attached. +// detachCatchUpIter detaches the catchUpIter that was previously attached. func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator { r.mu.Lock() defer r.mu.Unlock() - catchupIter := r.mu.catchupIter - r.mu.catchupIter = nil - return catchupIter + catchUpIter := r.mu.catchUpIter + r.mu.catchUpIter = nil + return catchUpIter } // waitForCaughtUp waits for all registrations overlapping the given span to diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index dcb3d35b20c4..8f15ecfa272a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -265,10 +265,10 @@ func TestRegistrationCatchUpScan(t *testing.T) { EndKey: roachpb.Key("w"), }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */) - require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) - require.NoError(t, r.maybeRunCatchupScan()) + require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) + require.NoError(t, r.maybeRunCatchUpScan()) require.True(t, iter.closed) - require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) + require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) // Compare the events sent on the registration's Stream to the expected events. expEvents := []*roachpb.RangeFeedEvent{ @@ -566,14 +566,14 @@ func TestRegistrationString(t *testing.T) { { r: registration{ span: roachpb.Span{Key: roachpb.Key("d")}, - catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1}, + catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1}, }, exp: `[d @ 0.000000010,1+]`, }, { r: registration{span: roachpb.Span{ Key: roachpb.Key("d"), EndKey: roachpb.Key("z")}, - catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9}, + catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9}, }, exp: `[{d-z} @ 0.000000040,9+]`, }, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 6d6667b4e458..bba569bb959d 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -136,7 +136,7 @@ func (i iteratorWithCloser) Close() { // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with an optional error when the rangefeed is // complete. The provided ConcurrentRequestLimiter is used to limit the number -// of rangefeeds using catchup iterators at the same time. +// of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { @@ -188,10 +188,10 @@ func (r *Replica) rangeFeedWithRangeID( // If we will be using a catch-up iterator, wait for the limiter here before // locking raftMu. - usingCatchupIter := false + usingCatchUpIter := false var iterSemRelease func() if !args.Timestamp.IsEmpty() { - usingCatchupIter = true + usingCatchUpIter = true alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) if err != nil { return roachpb.NewError(err) @@ -199,10 +199,10 @@ func (r *Replica) rangeFeedWithRangeID( // Finish the iterator limit if we exit before the iterator finishes. // The release function will be hooked into the Close method on the // iterator below. The sync.Once prevents any races between exiting early - // from this call and finishing the catchup scan underneath the + // from this call and finishing the catch-up scan underneath the // rangefeed.Processor. We need to release here in case we fail to // register the processor, or, more perniciously, in the case where the - // processor gets registered by shut down before starting the catchup + // processor gets registered by shut down before starting the catch-up // scan. var iterSemReleaseOnce sync.Once iterSemRelease = func() { @@ -223,7 +223,7 @@ func (r *Replica) rangeFeedWithRangeID( // Register the stream with a catch-up iterator. var catchUpIterFunc rangefeed.IteratorConstructor - if usingCatchupIter { + if usingCatchUpIter { catchUpIterFunc = func() storage.SimpleMVCCIterator { // Assert that we still hold the raftMu when this is called to ensure // that the catchUpIter reads from the current snapshot. @@ -234,7 +234,7 @@ func (r *Replica) rangeFeedWithRangeID( // performance optimization. However, they've had correctness issues in // the past (#28358, #34819) and no-one has the time for the due-diligence // necessary to be confidant in their correctness going forward. Not using - // them causes the total time spent in RangeFeed catchup on changefeed + // them causes the total time spent in RangeFeed catch-up on changefeed // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still // workable. See #35122 for details. // MinTimestampHint: args.Timestamp, @@ -328,7 +328,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter rangefeed.IteratorConstructor, + catchUpIter rangefeed.IteratorConstructor, withDiff bool, stream rangefeed.Stream, errC chan<- *roachpb.Error, @@ -339,7 +339,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( r.rangefeedMu.Lock() p := r.rangefeedMu.proc if p != nil { - reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) + reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC) if reg { // Registered successfully with an existing processor. // Update the rangefeed filter to avoid filtering ops @@ -398,7 +398,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // any other goroutines are able to stop the processor. In other words, // this ensures that the only time the registration fails is during // server shutdown. - reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) + reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC) if !reg { select { case <-r.store.Stopper().ShouldQuiesce():