diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..78f7d8c24f77 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -168,7 +168,7 @@ type Processor interface { Register( span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -326,12 +326,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor { // engine has not been closed. type IntentScannerConstructor func() IntentScanner -// CatchUpIteratorConstructor is used to construct an iterator that can be used -// for catchup-scans. Takes the key span and exclusive start time to run the -// catchup scan for. It should be called from underneath a stopper task to -// ensure that the engine has not been closed. -type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error) - // Start implements Processor interface. // // LegacyProcessor launches a goroutine to process rangefeed events and send @@ -404,14 +398,6 @@ func (p *LegacyProcessor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return - } - // Add the new registration to the registry. p.reg.Register(&r) @@ -559,7 +545,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) { func (p *LegacyProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -572,7 +558,7 @@ func (p *LegacyProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) select { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..5b22e3d45554 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -81,15 +81,6 @@ type registration struct { withDiff bool metrics *Metrics - // catchUpIterConstructor is used to construct the catchUpIter if necessary. - // The reason this constructor is plumbed down is to make sure that the - // iterator does not get constructed too late in server shutdown. However, - // it must also be stored in the struct to ensure that it is not constructed - // too late, after the raftMu has been dropped. Thus, this function, if - // non-nil, will be used to populate mu.catchUpIter while the registration - // is being registered by the processor. - catchUpIterConstructor CatchUpIteratorConstructor - // Output. stream Stream done *future.ErrorFuture @@ -123,7 +114,7 @@ type registration struct { func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, bufferSz int, blockWhenFull bool, @@ -135,7 +126,6 @@ func newRegistration( r := registration{ span: span, catchUpTimestamp: startTS, - catchUpIterConstructor: catchUpIterConstructor, withDiff: withDiff, metrics: metrics, stream: stream, @@ -146,6 +136,7 @@ func newRegistration( } r.mu.Locker = &syncutil.Mutex{} r.mu.caughtUp = true + r.mu.catchUpIter = catchUpIter return r } @@ -587,25 +578,6 @@ func (r *registration) waitForCaughtUp() error { return errors.Errorf("registration %v failed to empty in time", r.Range()) } -// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches -// the catchUpIter to be detached in the catchUpScan or closed on disconnect. -func (r *registration) maybeConstructCatchUpIter() error { - if r.catchUpIterConstructor == nil { - return nil - } - - catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp) - if err != nil { - return err - } - r.catchUpIterConstructor = nil - - r.mu.Lock() - defer r.mu.Unlock() - r.mu.catchUpIter = catchUpIter - return nil -} - // detachCatchUpIter detaches the catchUpIter that was previously attached. func (r *registration) detachCatchUpIter() *CatchUpIterator { r.mu.Lock() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..c5d552ace1e4 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,16 +100,16 @@ type testRegistration struct { stream *testStream } -func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor { +func makeCatchUpIteratorConstructor( + iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp, +) *CatchUpIterator { if iter == nil { return nil } - return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) { - return &CatchUpIterator{ - simpleCatchupIter: simpleCatchupIterAdapter{iter}, - span: span, - startTime: startTime, - }, nil + return &CatchUpIterator{ + simpleCatchupIter: simpleCatchupIterAdapter{iter}, + span: span, + startTime: startTime, } } @@ -120,7 +120,7 @@ func newTestRegistration( r := newRegistration( span, ts, - makeCatchUpIteratorConstructor(catchup), + makeCatchUpIteratorConstructor(catchup, span, ts), withDiff, 5, false, /* blockWhenFull */ @@ -129,9 +129,6 @@ func newTestRegistration( func() {}, &future.ErrorFuture{}, ) - if err := r.maybeConstructCatchUpIter(); err != nil { - panic(err) - } return &testRegistration{ registration: r, stream: s, diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..fa9779ed1807 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -294,7 +294,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) { func (p *ScheduledProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -307,7 +307,7 @@ func (p *ScheduledProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) @@ -319,14 +319,6 @@ func (p *ScheduledProcessor) Register( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return nil - } - // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..349295cdf059 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -271,25 +271,22 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIterFunc rangefeed.CatchUpIteratorConstructor + var catchUpIter *rangefeed.CatchUpIterator if usingCatchUpIter { - catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) { - // Assert that we still hold the raftMu when this is called to ensure - // that the catchUpIter reads from the current snapshot. - r.raftMu.AssertHeld() - i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer) - if err != nil { - return nil, err - } - if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { - i.OnEmit = f - } - return i, nil + catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(), + args.Timestamp, iterSemRelease, pacer) + if err != nil { + r.raftMu.Unlock() + iterSemRelease() + return future.MakeCompletedErrorFuture(err) + } + if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { + catchUpIter.OnEmit = f } } var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, &done, ) r.raftMu.Unlock() @@ -378,13 +375,19 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIter rangefeed.CatchUpIteratorConstructor, + catchUpIter *rangefeed.CatchUpIterator, withDiff bool, stream rangefeed.Stream, done *future.ErrorFuture, ) rangefeed.Processor { defer logSlowRangefeedRegistration(ctx)() + cleanupCatchUpIter := func() { + if catchUpIter != nil { + catchUpIter.Close() + } + } + // Attempt to register with an existing Rangefeed processor, if one exists. // The locking here is a little tricky because we need to handle the case // of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed). @@ -459,6 +462,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // below will fall through to the panic. if err := p.Start(r.store.Stopper(), rtsIter); err != nil { done.Set(err) + cleanupCatchUpIter() return nil } @@ -469,6 +473,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // server shutdown. reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) if !reg { + cleanupCatchUpIter() select { case <-r.store.Stopper().ShouldQuiesce(): done.Set(&kvpb.NodeUnavailableError{})