diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index bc1f835116ed..4d67c0e2436f 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -247,6 +247,13 @@ func (p *Processor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } + // Construct the catchupIter before notifying the registration that it + // has been registered. + if r.catchupIterConstructor != nil { + r.catchupIter = r.catchupIterConstructor() + r.catchupIterConstructor = nil + } + // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index db5c710f8c4e..7f9b73a1c31c 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -53,11 +53,22 @@ type Stream interface { // has finished. type registration struct { // Input. - span roachpb.Span - catchupTimestamp hlc.Timestamp + span roachpb.Span + catchupTimestamp hlc.Timestamp + withDiff bool + metrics *Metrics + + // catchupIterConstructor is used to construct the catchupIter if necessary. + // The reason this constructor is plumbed down is to make sure that the + // iterator does not get constructed too late in server shutdown. However, + // it must also be stored in the struct to ensure that it is not constructed + // too late, after the raftMu has been dropped. Thus, this function, if + // non-nil, will be used to populated catchupIter while the registration + // is being registered by the processor. catchupIterConstructor func() storage.SimpleMVCCIterator - withDiff bool - metrics *Metrics + // catchupIter is populated on the Processor's goroutine while the + // Replica.raftMu is still held. + catchupIter storage.SimpleMVCCIterator // Output. stream Stream @@ -285,11 +296,10 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. func (r *registration) maybeRunCatchupScan() error { - if r.catchupIterConstructor == nil { + if r.catchupIter == nil { return nil } - catchupIter := r.catchupIterConstructor() - r.catchupIterConstructor = nil + catchupIter := r.catchupIter start := timeutil.Now() defer func() { catchupIter.Close() diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 85b9f6ac4beb..107346c096a8 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -225,7 +225,9 @@ func (r *Replica) rangeFeedWithRangeID( var catchUpIterFunc rangefeed.IteratorConstructor if usingCatchupIter { catchUpIterFunc = func() storage.SimpleMVCCIterator { - + // Assert that we still hold the raftMu when this is called to ensure + // that the catchUpIter reads from the current snapshot. + r.raftMu.AssertHeld() innerIter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: args.Span.EndKey, // RangeFeed originally intended to use the time-bound iterator @@ -373,6 +375,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Start it with an iterator to initialize the resolved timestamp. rtsIter := func() storage.SimpleMVCCIterator { + // Assert that we still hold the raftMu when this is called to ensure + // that the catchUpIter reads from the current snapshot. The replica + // synchronizes with the rangefeed Processor calling this function by + // waiting for the Register call below to return. + r.raftMu.AssertHeld() return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: desc.EndKey.AsRawKey(), // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed