From 32579048d671b12b6351ae75f0bb2a1e6ab06af2 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Thu, 21 Sep 2023 15:25:29 +0100 Subject: [PATCH] rangefeed: create catchup iterators eagerly Previously, catchup iterators were created in the main rangefeed processor work loop. This is negatively affecting scheduler based processors as this operation could be slow. This commit makes iterator creation eager, simplifying error handling and making rangefeed times delays lower. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/bench_test.go | 2 +- pkg/kv/kvserver/rangefeed/processor.go | 47 ++++++++++-- pkg/kv/kvserver/rangefeed/processor_test.go | 75 +++++++++++++++---- pkg/kv/kvserver/rangefeed/registry.go | 13 +--- pkg/kv/kvserver/rangefeed/registry_test.go | 16 ++-- .../kvserver/rangefeed/scheduled_processor.go | 5 +- pkg/kv/kvserver/replica_rangefeed.go | 27 ++++--- 7 files changed, 125 insertions(+), 60 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index b12f4c2b3006..83b88d9321e5 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -107,7 +107,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { const withDiff = false streams[i] = &noopStream{ctx: ctx} futures[i] = &future.ErrorFuture{} - ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i]) + ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, withDiff, streams[i], nil, futures[i]) require.True(b, ok) } diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..1612131b7bb2 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -330,7 +331,41 @@ type IntentScannerConstructor func() IntentScanner // for catchup-scans. Takes the key span and exclusive start time to run the // catchup scan for. It should be called from underneath a stopper task to // ensure that the engine has not been closed. -type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error) +type CatchUpIteratorConstructor func() *CatchUpIterator + +// CatchUpIteratorContainer manages catchup iterator lifecycle during +// rangefeed registration creation. Iterator is created by replica under the +// lock to get a consistent state. Container is then passed to registration. +// If registration is created successfully, iterator ownership is moved to +// registration. If registration creation fails on any stage before that, +// container could be closed to release iterator. Since ownership is moved +// out of container, it is safe to close it regardless of startup success or +// failure. +type CatchUpIteratorContainer struct { + iter atomic.Pointer[CatchUpIterator] +} + +// NewCatchUpIteratorContainer constructs new container. It is safe to pass nil +// iterator. +func NewCatchUpIteratorContainer(iter *CatchUpIterator) *CatchUpIteratorContainer { + c := &CatchUpIteratorContainer{} + c.iter.Store(iter) + return c +} + +// Detach moves iterator out of container. Calling Close on container won't close +// the iterator after that. Safe to call on empty container. +func (c *CatchUpIteratorContainer) Detach() (iter *CatchUpIterator) { + return c.iter.Swap(nil) +} + +// Close closes underlying iterator if it was contained and was not moved out +// by Get. +func (c *CatchUpIteratorContainer) Close() { + if iter := c.Detach(); iter != nil { + iter.Close() + } +} // Start implements Processor interface. // @@ -405,12 +440,10 @@ func (p *LegacyProcessor) run( } // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return - } + // has been registered. This will move ownership of iterator to + // registration. Note that if the catchUpScan is never run, then + // the iterator obtained here will be closed in disconnect. + r.maybeConstructCatchUpIter() // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 6baedd826254..f5adb8f7ca08 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -23,6 +23,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -331,6 +332,10 @@ func waitErrorFuture(f *future.ErrorFuture) error { return resultErr } +var nilCatchUpIter = func() *CatchUpIterator { + return nil +} + func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { @@ -363,7 +368,7 @@ func TestProcessorBasic(t *testing.T) { r1OK, r1Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -496,7 +501,7 @@ func TestProcessorBasic(t *testing.T) { r2OK, r1And2Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, true, /* withDiff */ r2Stream, func() {}, @@ -592,7 +597,7 @@ func TestProcessorBasic(t *testing.T) { r3OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r3Stream, func() {}, @@ -615,7 +620,7 @@ func TestProcessorSlowConsumer(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -626,7 +631,7 @@ func TestProcessorSlowConsumer(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r2Stream, func() {}, @@ -722,7 +727,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -777,7 +782,7 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -854,7 +859,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -1110,7 +1115,7 @@ func TestProcessorConcurrentStop(t *testing.T) { runtime.Gosched() s := newTestStream() var done future.ErrorFuture - p.Register(h.span, hlc.Timestamp{}, nil, false, s, + p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false, s, func() {}, &done) }() go func() { @@ -1183,7 +1188,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { s := newTestStream() regs[s] = firstIdx var done future.ErrorFuture - p.Register(h.span, hlc.Timestamp{}, nil, false, + p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false, s, func() {}, &done) regDone <- struct{}{} } @@ -1243,7 +1248,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ rStream, func() {}, @@ -1324,7 +1329,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ rStream, func() {}, @@ -1395,7 +1400,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -1409,7 +1414,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r2Stream, func() {}, @@ -1561,7 +1566,7 @@ func TestProcessorBackpressure(t *testing.T) { // Add a registration. stream := newTestStream() done := &future.ErrorFuture{} - ok, _ := p.Register(span, hlc.MinTimestamp, nil, false, stream, nil, done) + ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, false, stream, nil, done) require.True(t, ok) // Wait for the initial checkpoint. @@ -1610,3 +1615,43 @@ func TestProcessorBackpressure(t *testing.T) { }, }, events[len(events)-1]) } + +func TestIteratorContainer(t *testing.T) { + defer leaktest.AfterTest(t)() + + eng := storage.NewDefaultInMemForTesting(storage.If(smallEngineBlocks, storage.BlockSize(1))) + defer eng.Close() + span := roachpb.Span{ + Key: keys.LocalMax, + EndKey: roachpb.KeyMax, + } + + t.Run("empty container", func(t *testing.T) { + container := NewCatchUpIteratorContainer(nil) + require.NotPanics(t, container.Close, "close empty container") + }) + + t.Run("close attached", func(t *testing.T) { + var closed bool + iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() { + closed = true + }, nil) + require.NoError(t, err, "failed to create iterator for test") + container := NewCatchUpIteratorContainer(iter) + container.Close() + require.True(t, closed, "iterator in container is not closed") + }) + + t.Run("closed detached", func(t *testing.T) { + var closed bool + iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() { + closed = true + }, nil) + require.NoError(t, err, "failed to create iterator for test") + container := NewCatchUpIteratorContainer(iter) + detached := container.Detach() + defer detached.Close() + container.Close() + require.False(t, closed, "iterator in container is not closed") + }) +} diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..239588a448d9 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -589,21 +589,12 @@ func (r *registration) waitForCaughtUp() error { // maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches // the catchUpIter to be detached in the catchUpScan or closed on disconnect. -func (r *registration) maybeConstructCatchUpIter() error { - if r.catchUpIterConstructor == nil { - return nil - } - - catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp) - if err != nil { - return err - } - r.catchUpIterConstructor = nil +func (r *registration) maybeConstructCatchUpIter() { + catchUpIter := r.catchUpIterConstructor() r.mu.Lock() defer r.mu.Unlock() r.mu.catchUpIter = catchUpIter - return nil } // detachCatchUpIter detaches the catchUpIter that was previously attached. diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..3b38ede661b7 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,16 +100,18 @@ type testRegistration struct { stream *testStream } -func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor { +func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp) CatchUpIteratorConstructor { if iter == nil { - return nil + return func() *CatchUpIterator { + return nil + } } - return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) { + return func() *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: simpleCatchupIterAdapter{iter}, span: span, startTime: startTime, - }, nil + } } } @@ -120,7 +122,7 @@ func newTestRegistration( r := newRegistration( span, ts, - makeCatchUpIteratorConstructor(catchup), + makeCatchUpIteratorConstructor(catchup, span, ts), withDiff, 5, false, /* blockWhenFull */ @@ -129,9 +131,7 @@ func newTestRegistration( func() {}, &future.ErrorFuture{}, ) - if err := r.maybeConstructCatchUpIter(); err != nil { - panic(err) - } + r.maybeConstructCatchUpIter() return &testRegistration{ registration: r, stream: s, diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..a1e3545b9517 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -322,10 +322,7 @@ func (p *ScheduledProcessor) Register( // Construct the catchUpIter before notifying the registration that it // has been registered. Note that if the catchUpScan is never run, then // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return nil - } + r.maybeConstructCatchUpIter() // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..b07f9c71231f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -271,25 +271,24 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIterFunc rangefeed.CatchUpIteratorConstructor + var catchUpIter *rangefeed.CatchUpIterator if usingCatchUpIter { - catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) { - // Assert that we still hold the raftMu when this is called to ensure - // that the catchUpIter reads from the current snapshot. - r.raftMu.AssertHeld() - i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer) - if err != nil { - return nil, err - } - if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { - i.OnEmit = f - } - return i, nil + catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(), + args.Timestamp, iterSemRelease, pacer) + if err != nil { + r.raftMu.Unlock() + iterSemRelease() + return future.MakeCompletedErrorFuture(err) + } + if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { + catchUpIter.OnEmit = f } } + catchUpIterContainer := rangefeed.NewCatchUpIteratorContainer(catchUpIter) + defer catchUpIterContainer.Close() var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIterContainer.Detach, args.WithDiff, lockedStream, &done, ) r.raftMu.Unlock()