diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 0338a24cad72..3fca1c99118d 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -135,18 +135,23 @@ type Processor interface { // It is ok to start registering streams before background initialization // completes. // - // The provided iterator is used to initialize the rangefeed's resolved - // timestamp. It must obey the contract of an iterator used for an - // initResolvedTSScan. The Processor promises to clean up the iterator by - // calling its Close method when it is finished. + // The provided IntentScannerConstructor is used to construct a lock table + // iterator which will be used to initialize rangefeed's resolved timestamp. + // It must be called under the same raftMu lock as first registration to + // ensure that there would be no missing events. For LegacyProcessor, this is + // currently achieved by Register function synchronizing with the work loop + // before the lock is released. For ScheduledProcessor, this is achieved by + // calling IntentScannerConstructor synchronously during Start. // - // Note that newRtsIter must be called under the same lock as first - // registration to ensure that all there would be no missing events. - // This is currently achieved by Register function synchronizing with - // the work loop before the lock is released. + // If IntentScannerConstructor returns a non-nil error, the processor will be + // stopped. Otherwise, the intent scanner is successfully initialized. It must + // obey the contract of an iterator used for an initResolvedTSScan. The + // processor promises to clean up the iterator by calling its Close method when + // it's finished. // - // If the iterator is nil then no initialization scan will be performed and - // the resolved timestamp will immediately be considered initialized. + // If the provided IntentScannerConstructor itself is nil then no + // initialization scan will be performed and the resolved timestamp will + // immediately be considered initialized. This is only possible in tests. Start(stopper *stop.Stopper, newRtsIter IntentScannerConstructor) error // Stop processor and close all registrations. // @@ -302,4 +307,4 @@ type logicalOpMetadata struct { // IntentScannerConstructor is used to construct an IntentScanner. It // should be called from underneath a stopper task to ensure that the // engine has not been closed. -type IntentScannerConstructor func() IntentScanner +type IntentScannerConstructor func() (IntentScanner, error) diff --git a/pkg/kv/kvserver/rangefeed/processor_helpers_test.go b/pkg/kv/kvserver/rangefeed/processor_helpers_test.go index b6e146cb14b2..75b9ee1494db 100644 --- a/pkg/kv/kvserver/rangefeed/processor_helpers_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_helpers_test.go @@ -283,8 +283,8 @@ func withMetrics(m *Metrics) option { func withRtsScanner(scanner IntentScanner) option { return func(config *testConfig) { if scanner != nil { - config.isc = func() IntentScanner { - return scanner + config.isc = func() (IntentScanner, error) { + return scanner, nil } } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 442a20c46d7c..1181207688ca 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -634,6 +635,45 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { }) } +// TestIntentScannerOnError tests that when a processor is given with an intent +// scanner constructor that fails to create a scanner, the processor will stop +// gracefully. Regression for #127204. +func TestIntentScannerOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + t.Helper() + stopper := stop.NewStopper() + cfg := testConfig{ + Config: Config{ + RangeID: 2, + Stopper: stopper, + Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, + Metrics: NewMetrics(), + }, + } + sch := NewScheduler(SchedulerConfig{ + Workers: 1, + PriorityWorkers: 1, + Metrics: NewSchedulerMetrics(time.Second), + }) + require.NoError(t, sch.Start(context.Background(), stopper)) + cfg.Scheduler = sch + s := NewProcessor(cfg.Config) + require.ErrorContains(t, s.Start(stopper, func() (IntentScanner, error) { + return nil, errors.New("scanner error") + }), "scanner error") + ctx := context.Background() + defer stopper.Stop(ctx) + testutils.SucceedsSoon(t, func() error { + select { + case <-s.(*ScheduledProcessor).stoppedC: + return nil + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatalf("time out waiting for processor stopped") + return nil + } + }) +} + // TestProcessorInitializeResolvedTimestamp tests that when a Processor is given // a resolved timestamp iterator, it doesn't initialize its resolved timestamp // until it has consumed all intents in the iterator. diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 6bca9a20f38c..1c3c7f302f23 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -86,14 +86,14 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor { return p } -// Start performs processor one-time initialization e.g registers with -// scheduler and fires up background tasks to populate processor state. -// The provided iterator is used to initialize the rangefeed's resolved -// timestamp. It must obey the contract of an iterator used for an -// initResolvedTSScan. The Processor promises to clean up the iterator by -// calling its Close method when it is finished. If the iterator is nil then -// no initialization scan will be performed and the resolved timestamp will -// immediately be considered initialized. +// Start performs processor one-time initialization e.g registers with scheduler +// and fires up background tasks to populate processor state. The provided +// iterator is used to initialize the rangefeed's resolved timestamp. It must +// obey the contract of an iterator used for an initResolvedTSScan. The +// Processor promises to clean up the iterator by calling its Close method when +// it is finished. If IntentScannerConstructor is nil then no initialization +// scan will be performed and the resolved timestamp will immediately be +// considered initialized. func (p *ScheduledProcessor) Start( stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor, ) error { @@ -111,17 +111,22 @@ func (p *ScheduledProcessor) Start( // Launch an async task to scan over the resolved timestamp iterator and // initialize the unresolvedIntentQueue. if rtsIterFunc != nil { - rtsIter := rtsIterFunc() + rtsIter, err := rtsIterFunc() + if err != nil { + // No need to close rtsIter if error is non-nil. + p.scheduler.StopProcessor() + return err + } initScan := newInitResolvedTSScan(p.Span, p, rtsIter) // TODO(oleg): we need to cap number of tasks that we can fire up across // all feeds as they could potentially generate O(n) tasks during start. - err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run) - if err != nil { + if err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run); err != nil { initScan.Cancel() p.scheduler.StopProcessor() return err } } else { + // Only possible in tests. p.initResolvedTS(p.taskCtx, nil) } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 646492ee6a3e..562d8b84880e 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -499,19 +499,13 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( p = rangefeed.NewProcessor(cfg) // Start it with an iterator to initialize the resolved timestamp. - rtsIter := func() rangefeed.IntentScanner { + rtsIter := func() (rangefeed.IntentScanner, error) { // Assert that we still hold the raftMu when this is called to ensure // that the rtsIter reads from the current snapshot. The replica // synchronizes with the rangefeed Processor calling this function by // waiting for the Register call below to return. r.raftMu.AssertHeld() - - scanner, err := rangefeed.NewSeparatedIntentScanner(streamCtx, r.store.TODOEngine(), desc.RSpan()) - if err != nil { - stream.Disconnect(kvpb.NewError(err)) - return nil - } - return scanner + return rangefeed.NewSeparatedIntentScanner(streamCtx, r.store.TODOEngine(), desc.RSpan()) } // NB: This only errors if the stopper is stopping, and we have to return here