Skip to content

Commit

Permalink
rangefeed: fix panic due to rangefeed stopper race
Browse files Browse the repository at this point in the history
This patch fixes a race condition that could cause an `unexpected
Stopped processor` panic if a rangefeed registration was attempted while
a store was stopping.

Registering a rangefeed panics if a newly created rangefeed processor is
unexpectedly stopped and the store's stopper is not quiescing. However,
the stopper has two distinct states that it transitions through:
stopping and quiescing. It's possible for the processor to fail to start
because the stopper is stopping, but before the stopper has transitioned
to quiescing, which would trigger this panic.

This patch propagates the processor startup error to the rangefeed
registration and through to the caller, returning before attempting
the registration at all and avoiding the panic. This was confirmed with
50000 stress runs of `TestPGTest/pgjdbc`, all of which succeeded.

Release note (bug fix): Fixed a race condition that in rare
circumstances could cause a node to panic with `unexpected Stopped
processor` during shutdown.
  • Loading branch information
erikgrinaker committed Feb 20, 2022
1 parent 0bb1218 commit 3d1281a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectWithErr(all, roachpb.NewError(err))
close(p.stoppedC)
return err
}
return nil
}

// run is called from Start and runs the rangefeed.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()

var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand All @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, makeIntentScannerConstructor(rtsIter))
require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter)))
return p, stopper
}

Expand All @@ -165,13 +166,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
func newTestProcessor(
t *testing.T, rtsIter storage.SimpleMVCCIterator,
) (*Processor, *stop.Stopper) {
t.Helper()
return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */)
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Test processor without registrations.
Expand Down Expand Up @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) {
// to call on a nil Processor.
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
require.Panics(t, func() { p.Start(stopper, nil) })
require.Panics(t, func() { _ = p.Start(stopper, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) })
}

func TestProcessorSlowConsumer(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Add a registration.
Expand Down Expand Up @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
}, nil)
rtsIter.block = make(chan struct{})

p, stopper := newTestProcessor(rtsIter)
p, stopper := newTestProcessor(t, rtsIter)
defer stopper.Stop(context.Background())

// The resolved timestamp should not be initialized.
Expand Down Expand Up @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
})

p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp)
p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp)
defer stopper.Stop(context.Background())

// Add a few intents and move the closed timestamp forward.
Expand Down Expand Up @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
defer leaktest.AfterTest(t)()
const trials = 10
for i := 0; i < trials; i++ {
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)

var wg sync.WaitGroup
wg.Add(6)
Expand Down Expand Up @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
// observes only operations that are consumed after it has registered.
func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

firstC := make(chan int64)
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
return rangefeed.NewLegacyIntentScanner(iter)
}

p.Start(r.store.Stopper(), rtsIter)
// NB: This only errors if the stopper is stopping, and we have to return here
// in that case. We do check ShouldQuiesce() below, but that's not sufficient
// because the stopper has two states: stopping and quiescing. If this errors
// due to stopping, but before it enters the quiescing state, then the select
// below will fall through to the panic.
if err := p.Start(r.store.Stopper(), rtsIter); err != nil {
errC <- roachpb.NewError(err)
return nil
}

// Register with the processor *before* we attach its reference to the
// Replica struct. This ensures that the registration is in place before
Expand Down

0 comments on commit 3d1281a

Please sign in to comment.