From cf35678fa4aa1fc5b8f6ee8890db3f1dbcc94ed5 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Thu, 21 Sep 2023 15:25:29 +0100 Subject: [PATCH] rangefeed: create catchup iterators eagerly Previously, catchup iterators were created in the main rangefeed processor work loop. This is negatively affecting scheduler based processors as this operation could be slow. This commit makes iterator creation eager, simplifying error handling and making rangefeed times delays lower. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 20 ++--------- pkg/kv/kvserver/rangefeed/registry.go | 32 ++--------------- pkg/kv/kvserver/rangefeed/registry_test.go | 19 +++++----- .../kvserver/rangefeed/scheduled_processor.go | 12 ++----- pkg/kv/kvserver/replica_rangefeed.go | 35 +++++++++++-------- 5 files changed, 35 insertions(+), 83 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..78f7d8c24f77 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -168,7 +168,7 @@ type Processor interface { Register( span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -326,12 +326,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor { // engine has not been closed. type IntentScannerConstructor func() IntentScanner -// CatchUpIteratorConstructor is used to construct an iterator that can be used -// for catchup-scans. Takes the key span and exclusive start time to run the -// catchup scan for. It should be called from underneath a stopper task to -// ensure that the engine has not been closed. -type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error) - // Start implements Processor interface. // // LegacyProcessor launches a goroutine to process rangefeed events and send @@ -404,14 +398,6 @@ func (p *LegacyProcessor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return - } - // Add the new registration to the registry. p.reg.Register(&r) @@ -559,7 +545,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) { func (p *LegacyProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -572,7 +558,7 @@ func (p *LegacyProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) select { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..5b22e3d45554 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -81,15 +81,6 @@ type registration struct { withDiff bool metrics *Metrics - // catchUpIterConstructor is used to construct the catchUpIter if necessary. - // The reason this constructor is plumbed down is to make sure that the - // iterator does not get constructed too late in server shutdown. However, - // it must also be stored in the struct to ensure that it is not constructed - // too late, after the raftMu has been dropped. Thus, this function, if - // non-nil, will be used to populate mu.catchUpIter while the registration - // is being registered by the processor. - catchUpIterConstructor CatchUpIteratorConstructor - // Output. stream Stream done *future.ErrorFuture @@ -123,7 +114,7 @@ type registration struct { func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, bufferSz int, blockWhenFull bool, @@ -135,7 +126,6 @@ func newRegistration( r := registration{ span: span, catchUpTimestamp: startTS, - catchUpIterConstructor: catchUpIterConstructor, withDiff: withDiff, metrics: metrics, stream: stream, @@ -146,6 +136,7 @@ func newRegistration( } r.mu.Locker = &syncutil.Mutex{} r.mu.caughtUp = true + r.mu.catchUpIter = catchUpIter return r } @@ -587,25 +578,6 @@ func (r *registration) waitForCaughtUp() error { return errors.Errorf("registration %v failed to empty in time", r.Range()) } -// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches -// the catchUpIter to be detached in the catchUpScan or closed on disconnect. -func (r *registration) maybeConstructCatchUpIter() error { - if r.catchUpIterConstructor == nil { - return nil - } - - catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp) - if err != nil { - return err - } - r.catchUpIterConstructor = nil - - r.mu.Lock() - defer r.mu.Unlock() - r.mu.catchUpIter = catchUpIter - return nil -} - // detachCatchUpIter detaches the catchUpIter that was previously attached. func (r *registration) detachCatchUpIter() *CatchUpIterator { r.mu.Lock() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..c5d552ace1e4 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,16 +100,16 @@ type testRegistration struct { stream *testStream } -func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor { +func makeCatchUpIteratorConstructor( + iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp, +) *CatchUpIterator { if iter == nil { return nil } - return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) { - return &CatchUpIterator{ - simpleCatchupIter: simpleCatchupIterAdapter{iter}, - span: span, - startTime: startTime, - }, nil + return &CatchUpIterator{ + simpleCatchupIter: simpleCatchupIterAdapter{iter}, + span: span, + startTime: startTime, } } @@ -120,7 +120,7 @@ func newTestRegistration( r := newRegistration( span, ts, - makeCatchUpIteratorConstructor(catchup), + makeCatchUpIteratorConstructor(catchup, span, ts), withDiff, 5, false, /* blockWhenFull */ @@ -129,9 +129,6 @@ func newTestRegistration( func() {}, &future.ErrorFuture{}, ) - if err := r.maybeConstructCatchUpIter(); err != nil { - panic(err) - } return &testRegistration{ registration: r, stream: s, diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..fa9779ed1807 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -294,7 +294,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) { func (p *ScheduledProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -307,7 +307,7 @@ func (p *ScheduledProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) @@ -319,14 +319,6 @@ func (p *ScheduledProcessor) Register( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return nil - } - // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..349295cdf059 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -271,25 +271,22 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIterFunc rangefeed.CatchUpIteratorConstructor + var catchUpIter *rangefeed.CatchUpIterator if usingCatchUpIter { - catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) { - // Assert that we still hold the raftMu when this is called to ensure - // that the catchUpIter reads from the current snapshot. - r.raftMu.AssertHeld() - i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer) - if err != nil { - return nil, err - } - if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { - i.OnEmit = f - } - return i, nil + catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(), + args.Timestamp, iterSemRelease, pacer) + if err != nil { + r.raftMu.Unlock() + iterSemRelease() + return future.MakeCompletedErrorFuture(err) + } + if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { + catchUpIter.OnEmit = f } } var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, &done, ) r.raftMu.Unlock() @@ -378,13 +375,19 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIter rangefeed.CatchUpIteratorConstructor, + catchUpIter *rangefeed.CatchUpIterator, withDiff bool, stream rangefeed.Stream, done *future.ErrorFuture, ) rangefeed.Processor { defer logSlowRangefeedRegistration(ctx)() + cleanupCatchUpIter := func() { + if catchUpIter != nil { + catchUpIter.Close() + } + } + // Attempt to register with an existing Rangefeed processor, if one exists. // The locking here is a little tricky because we need to handle the case // of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed). @@ -459,6 +462,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // below will fall through to the panic. if err := p.Start(r.store.Stopper(), rtsIter); err != nil { done.Set(err) + cleanupCatchUpIter() return nil } @@ -469,6 +473,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // server shutdown. reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) if !reg { + cleanupCatchUpIter() select { case <-r.store.Stopper().ShouldQuiesce(): done.Set(&kvpb.NodeUnavailableError{})