Skip to content

Commit

Permalink
Merge #76825
Browse files Browse the repository at this point in the history
76825: rangefeed: fix panic due to rangefeed stopper race r=RaduBerinde a=erikgrinaker

This patch fixes a race condition that could cause an
`unexpected Stopped processor` panic if a rangefeed registration was
attempted while a store was stopping.

Registering a rangefeed panics if a newly created rangefeed processor is
unexpectedly stopped and the store's stopper is not quiescing. However,
the stopper has two distinct states that it transitions through:
stopping and quiescing. It's possible for the processor to fail to start
because the stopper is stopping, but before the stopper has transitioned
to quiescing, which would trigger this panic.

This patch propagates the processor startup error to the rangefeed
registration and through to the caller, returning before attempting
the registration at all and avoiding the panic. This was confirmed with
50000 stress runs of `TestPGTest/pgjdbc`, all of which succeeded.

Resolves #76811.
Resolves #76767.
Resolves #76724.
Resolves #76655.
Resolves #76649.
Resolves #75129.
Resolves #64262.

Release note (bug fix): Fixed a race condition that in rare
circumstances could cause a node to panic with
`unexpected Stopped processor` during shutdown.

---

For details, see #76649 (comment).

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Feb 20, 2022
2 parents ae6d033 + ffb21e4 commit 4f3e5e0
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectWithErr(all, roachpb.NewError(err))
close(p.stoppedC)
return err
}
return nil
}

// run is called from Start and runs the rangefeed.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()

var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand All @@ -153,7 +154,7 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, makeIntentScannerConstructor(rtsIter))
require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter)))
return p, stopper
}

Expand All @@ -164,13 +165,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
func newTestProcessor(
t *testing.T, rtsIter storage.SimpleMVCCIterator,
) (*Processor, *stop.Stopper) {
t.Helper()
return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */)
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Test processor without registrations.
Expand Down Expand Up @@ -433,13 +437,13 @@ func TestNilProcessor(t *testing.T) {
// to call on a nil Processor.
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
require.Panics(t, func() { p.Start(stopper, nil) })
require.Panics(t, func() { _ = p.Start(stopper, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) })
}

func TestProcessorSlowConsumer(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Add a registration.
Expand Down Expand Up @@ -565,7 +569,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
}, nil)
rtsIter.block = make(chan struct{})

p, stopper := newTestProcessor(rtsIter)
p, stopper := newTestProcessor(t, rtsIter)
defer stopper.Stop(context.Background())

// The resolved timestamp should not be initialized.
Expand Down Expand Up @@ -727,7 +731,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
})

p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp)
p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp)
defer stopper.Stop(context.Background())

// Add a few intents and move the closed timestamp forward.
Expand Down Expand Up @@ -817,7 +821,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
defer leaktest.AfterTest(t)()
const trials = 10
for i := 0; i < trials; i++ {
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)

var wg sync.WaitGroup
wg.Add(6)
Expand Down Expand Up @@ -863,7 +867,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
// observes only operations that are consumed after it has registered.
func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

firstC := make(chan int64)
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
return rangefeed.NewSeparatedIntentScanner(iter)
}

p.Start(r.store.Stopper(), rtsIter)
// NB: This only errors if the stopper is stopping, and we have to return here
// in that case. We do check ShouldQuiesce() below, but that's not sufficient
// because the stopper has two states: stopping and quiescing. If this errors
// due to stopping, but before it enters the quiescing state, then the select
// below will fall through to the panic.
if err := p.Start(r.store.Stopper(), rtsIter); err != nil {
errC <- roachpb.NewError(err)
return nil
}

// Register with the processor *before* we attach its reference to the
// Replica struct. This ensures that the registration is in place before
Expand Down

0 comments on commit 4f3e5e0

Please sign in to comment.