diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index b12f4c2b3006..83b88d9321e5 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -107,7 +107,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { const withDiff = false streams[i] = &noopStream{ctx: ctx} futures[i] = &future.ErrorFuture{} - ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i]) + ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, withDiff, streams[i], nil, futures[i]) require.True(b, ok) } diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..1612131b7bb2 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -330,7 +331,41 @@ type IntentScannerConstructor func() IntentScanner // for catchup-scans. Takes the key span and exclusive start time to run the // catchup scan for. It should be called from underneath a stopper task to // ensure that the engine has not been closed. -type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error) +type CatchUpIteratorConstructor func() *CatchUpIterator + +// CatchUpIteratorContainer manages catchup iterator lifecycle during +// rangefeed registration creation. Iterator is created by replica under the +// lock to get a consistent state. Container is then passed to registration. +// If registration is created successfully, iterator ownership is moved to +// registration. If registration creation fails on any stage before that, +// container could be closed to release iterator. Since ownership is moved +// out of container, it is safe to close it regardless of startup success or +// failure. +type CatchUpIteratorContainer struct { + iter atomic.Pointer[CatchUpIterator] +} + +// NewCatchUpIteratorContainer constructs new container. It is safe to pass nil +// iterator. +func NewCatchUpIteratorContainer(iter *CatchUpIterator) *CatchUpIteratorContainer { + c := &CatchUpIteratorContainer{} + c.iter.Store(iter) + return c +} + +// Detach moves iterator out of container. Calling Close on container won't close +// the iterator after that. Safe to call on empty container. +func (c *CatchUpIteratorContainer) Detach() (iter *CatchUpIterator) { + return c.iter.Swap(nil) +} + +// Close closes underlying iterator if it was contained and was not moved out +// by Get. +func (c *CatchUpIteratorContainer) Close() { + if iter := c.Detach(); iter != nil { + iter.Close() + } +} // Start implements Processor interface. // @@ -405,12 +440,10 @@ func (p *LegacyProcessor) run( } // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return - } + // has been registered. This will move ownership of iterator to + // registration. Note that if the catchUpScan is never run, then + // the iterator obtained here will be closed in disconnect. + r.maybeConstructCatchUpIter() // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 6baedd826254..f5adb8f7ca08 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -23,6 +23,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -331,6 +332,10 @@ func waitErrorFuture(f *future.ErrorFuture) error { return resultErr } +var nilCatchUpIter = func() *CatchUpIterator { + return nil +} + func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { @@ -363,7 +368,7 @@ func TestProcessorBasic(t *testing.T) { r1OK, r1Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -496,7 +501,7 @@ func TestProcessorBasic(t *testing.T) { r2OK, r1And2Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, true, /* withDiff */ r2Stream, func() {}, @@ -592,7 +597,7 @@ func TestProcessorBasic(t *testing.T) { r3OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r3Stream, func() {}, @@ -615,7 +620,7 @@ func TestProcessorSlowConsumer(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -626,7 +631,7 @@ func TestProcessorSlowConsumer(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r2Stream, func() {}, @@ -722,7 +727,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -777,7 +782,7 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -854,7 +859,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -1110,7 +1115,7 @@ func TestProcessorConcurrentStop(t *testing.T) { runtime.Gosched() s := newTestStream() var done future.ErrorFuture - p.Register(h.span, hlc.Timestamp{}, nil, false, s, + p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false, s, func() {}, &done) }() go func() { @@ -1183,7 +1188,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { s := newTestStream() regs[s] = firstIdx var done future.ErrorFuture - p.Register(h.span, hlc.Timestamp{}, nil, false, + p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false, s, func() {}, &done) regDone <- struct{}{} } @@ -1243,7 +1248,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ rStream, func() {}, @@ -1324,7 +1329,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ rStream, func() {}, @@ -1395,7 +1400,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r1Stream, func() {}, @@ -1409,7 +1414,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ + nilCatchUpIter, false, /* withDiff */ r2Stream, func() {}, @@ -1561,7 +1566,7 @@ func TestProcessorBackpressure(t *testing.T) { // Add a registration. stream := newTestStream() done := &future.ErrorFuture{} - ok, _ := p.Register(span, hlc.MinTimestamp, nil, false, stream, nil, done) + ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, false, stream, nil, done) require.True(t, ok) // Wait for the initial checkpoint. @@ -1610,3 +1615,43 @@ func TestProcessorBackpressure(t *testing.T) { }, }, events[len(events)-1]) } + +func TestIteratorContainer(t *testing.T) { + defer leaktest.AfterTest(t)() + + eng := storage.NewDefaultInMemForTesting(storage.If(smallEngineBlocks, storage.BlockSize(1))) + defer eng.Close() + span := roachpb.Span{ + Key: keys.LocalMax, + EndKey: roachpb.KeyMax, + } + + t.Run("empty container", func(t *testing.T) { + container := NewCatchUpIteratorContainer(nil) + require.NotPanics(t, container.Close, "close empty container") + }) + + t.Run("close attached", func(t *testing.T) { + var closed bool + iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() { + closed = true + }, nil) + require.NoError(t, err, "failed to create iterator for test") + container := NewCatchUpIteratorContainer(iter) + container.Close() + require.True(t, closed, "iterator in container is not closed") + }) + + t.Run("closed detached", func(t *testing.T) { + var closed bool + iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() { + closed = true + }, nil) + require.NoError(t, err, "failed to create iterator for test") + container := NewCatchUpIteratorContainer(iter) + detached := container.Detach() + defer detached.Close() + container.Close() + require.False(t, closed, "iterator in container is not closed") + }) +} diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..239588a448d9 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -589,21 +589,12 @@ func (r *registration) waitForCaughtUp() error { // maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches // the catchUpIter to be detached in the catchUpScan or closed on disconnect. -func (r *registration) maybeConstructCatchUpIter() error { - if r.catchUpIterConstructor == nil { - return nil - } - - catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp) - if err != nil { - return err - } - r.catchUpIterConstructor = nil +func (r *registration) maybeConstructCatchUpIter() { + catchUpIter := r.catchUpIterConstructor() r.mu.Lock() defer r.mu.Unlock() r.mu.catchUpIter = catchUpIter - return nil } // detachCatchUpIter detaches the catchUpIter that was previously attached. diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..3b38ede661b7 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,16 +100,18 @@ type testRegistration struct { stream *testStream } -func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor { +func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp) CatchUpIteratorConstructor { if iter == nil { - return nil + return func() *CatchUpIterator { + return nil + } } - return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) { + return func() *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: simpleCatchupIterAdapter{iter}, span: span, startTime: startTime, - }, nil + } } } @@ -120,7 +122,7 @@ func newTestRegistration( r := newRegistration( span, ts, - makeCatchUpIteratorConstructor(catchup), + makeCatchUpIteratorConstructor(catchup, span, ts), withDiff, 5, false, /* blockWhenFull */ @@ -129,9 +131,7 @@ func newTestRegistration( func() {}, &future.ErrorFuture{}, ) - if err := r.maybeConstructCatchUpIter(); err != nil { - panic(err) - } + r.maybeConstructCatchUpIter() return &testRegistration{ registration: r, stream: s, diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..a1e3545b9517 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -322,10 +322,7 @@ func (p *ScheduledProcessor) Register( // Construct the catchUpIter before notifying the registration that it // has been registered. Note that if the catchUpScan is never run, then // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return nil - } + r.maybeConstructCatchUpIter() // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..b07f9c71231f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -271,25 +271,24 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIterFunc rangefeed.CatchUpIteratorConstructor + var catchUpIter *rangefeed.CatchUpIterator if usingCatchUpIter { - catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) { - // Assert that we still hold the raftMu when this is called to ensure - // that the catchUpIter reads from the current snapshot. - r.raftMu.AssertHeld() - i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer) - if err != nil { - return nil, err - } - if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { - i.OnEmit = f - } - return i, nil + catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(), + args.Timestamp, iterSemRelease, pacer) + if err != nil { + r.raftMu.Unlock() + iterSemRelease() + return future.MakeCompletedErrorFuture(err) + } + if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { + catchUpIter.OnEmit = f } } + catchUpIterContainer := rangefeed.NewCatchUpIteratorContainer(catchUpIter) + defer catchUpIterContainer.Close() var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIterContainer.Detach, args.WithDiff, lockedStream, &done, ) r.raftMu.Unlock()