diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index ba6526bf2e44..12c553928786 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -198,15 +198,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { - pErr := roachpb.NewError(err) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectWithErr(all, roachpb.NewError(err)) close(p.stoppedC) + return err } + return nil } // run is called from Start and runs the rangefeed. diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 327a7b2d6219..ebc77919fdab 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed const testProcessorEventCCap = 16 func newTestProcessorWithTxnPusher( - rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, + t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, ) (*Processor, *stop.Stopper) { + t.Helper() stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, makeIntentScannerConstructor(rtsIter)) + require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter))) return p, stopper } @@ -165,13 +166,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) } } -func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) { - return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) +func newTestProcessor( + t *testing.T, rtsIter storage.SimpleMVCCIterator, +) (*Processor, *stop.Stopper) { + t.Helper() + return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */) } func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Test processor without registrations. @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) { // to call on a nil Processor. stopper := stop.NewStopper() defer stopper.Stop(context.Background()) - require.Panics(t, func() { p.Start(stopper, nil) }) + require.Panics(t, func() { _ = p.Start(stopper, nil) }) require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) }) } func TestProcessorSlowConsumer(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Add a registration. @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { }, nil) rtsIter.block = make(chan struct{}) - p, stopper := newTestProcessor(rtsIter) + p, stopper := newTestProcessor(t, rtsIter) defer stopper.Stop(context.Background()) // The resolved timestamp should not be initialized. @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil }) - p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp) + p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp) defer stopper.Stop(context.Background()) // Add a few intents and move the closed timestamp forward. @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) { defer leaktest.AfterTest(t)() const trials = 10 for i := 0; i < trials; i++ { - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) var wg sync.WaitGroup wg.Add(6) @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) { // observes only operations that are consumed after it has registered. func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) firstC := make(chan int64) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ce4cac40bdc5..32574d9bac5f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -411,7 +411,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( return rangefeed.NewLegacyIntentScanner(iter) } - p.Start(r.store.Stopper(), rtsIter) + // NB: This only errors if the stopper is stopping, and we have to return here + // in that case. We do check ShouldQuiesce() below, but that's not sufficient + // because the stopper has two states: stopping and quiescing. If this errors + // due to stopping, but before it enters the quiescing state, then the select + // below will fall through to the panic. + if err := p.Start(r.store.Stopper(), rtsIter); err != nil { + errC <- roachpb.NewError(err) + return nil + } // Register with the processor *before* we attach its reference to the // Replica struct. This ensures that the registration is in place before