From 2d2b928acc8f74a7f127ecae755a0919e7782f57 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 20 Feb 2022 12:07:09 +0000 Subject: [PATCH] rangefeed: fix panic due to rangefeed stopper race This patch fixes a race condition that could cause an `unexpected Stopped processor` panic if a rangefeed registration was attempted while a store was stopping. Registering a rangefeed panics if a newly created rangefeed processor is unexpectedly stopped and the store's stopper is not quiescing. However, the stopper has two distinct states that it transitions through: stopping and quiescing. It's possible for the processor to fail to start because the stopper is stopping, but before the stopper has transitioned to quiescing, which would trigger this panic. This patch propagates the processor startup error to the rangefeed registration and through to the caller, returning before attempting the registration at all and avoiding the panic. This was confirmed with 50000 stress runs of `TestPGTest/pgjdbc`, all of which succeeded. Release note (bug fix): Fixed a race condition that in rare circumstances could cause a node to panic with `unexpected Stopped processor` during shutdown. --- pkg/kv/kvserver/rangefeed/processor.go | 7 +++--- pkg/kv/kvserver/rangefeed/processor_test.go | 26 ++++++++++++--------- pkg/kv/kvserver/replica_rangefeed.go | 11 ++++++++- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index caf666ee0060..d7d75a452382 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -191,15 +191,16 @@ type IteratorConstructor func() storage.SimpleMVCCIterator // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) error { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { - pErr := roachpb.NewError(err) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectWithErr(all, roachpb.NewError(err)) close(p.stoppedC) + return err } + return nil } // run is called from Start and runs the rangefeed. diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 4d0b7cc1e950..890d855baf18 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed const testProcessorEventCCap = 16 func newTestProcessorWithTxnPusher( - rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, + t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, ) (*Processor, *stop.Stopper) { + t.Helper() stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, makeIteratorConstructor(rtsIter)) + require.NoError(t, p.Start(stopper, makeIteratorConstructor(rtsIter))) return p, stopper } @@ -165,13 +166,16 @@ func makeIteratorConstructor(rtsIter storage.SimpleMVCCIterator) IteratorConstru return func() storage.SimpleMVCCIterator { return rtsIter } } -func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) { - return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) +func newTestProcessor( + t *testing.T, rtsIter storage.SimpleMVCCIterator, +) (*Processor, *stop.Stopper) { + t.Helper() + return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */) } func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Test processor without registrations. @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) { // to call on a nil Processor. stopper := stop.NewStopper() defer stopper.Stop(context.Background()) - require.Panics(t, func() { p.Start(stopper, nil) }) + require.Panics(t, func() { _ = p.Start(stopper, nil) }) require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) }) } func TestProcessorSlowConsumer(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Add a registration. @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { }) rtsIter.block = make(chan struct{}) - p, stopper := newTestProcessor(rtsIter) + p, stopper := newTestProcessor(t, rtsIter) defer stopper.Stop(context.Background()) // The resolved timestamp should not be initialized. @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil }) - p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp) + p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp) defer stopper.Stop(context.Background()) // Add a few intents and move the closed timestamp forward. @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) { defer leaktest.AfterTest(t)() const trials = 10 for i := 0; i < trials; i++ { - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) var wg sync.WaitGroup wg.Add(6) @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) { // observes only operations that are consumed after it has registered. func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) firstC := make(chan int64) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index bbc6ffbd23a1..732945fd236f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -392,7 +392,16 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // MinTimestampHint: r.ResolvedTimestamp, }) } - p.Start(r.store.Stopper(), rtsIter) + + // NB: This only errors if the stopper is stopping, and we have to return here + // in that case. We do check ShouldQuiesce() below, but that's not sufficient + // because the stopper has two states: stopping and quiescing. If this errors + // due to stopping, but before it enters the quiescing state, then the select + // below will fall through to the panic. + if err := p.Start(r.store.Stopper(), rtsIter); err != nil { + errC <- roachpb.NewError(err) + return nil + } // Register with the processor *before* we attach its reference to the // Replica struct. This ensures that the registration is in place before