Skip to content

Commit

Permalink
Merge pull request #76827 from erikgrinaker/backport21.2-76825
Browse files Browse the repository at this point in the history
release-21.2: rangefeed: fix panic due to rangefeed stopper race
  • Loading branch information
erikgrinaker authored Feb 21, 2022
2 parents 0bb1218 + 3d1281a commit 3a389da
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectWithErr(all, roachpb.NewError(err))
close(p.stoppedC)
return err
}
return nil
}

// run is called from Start and runs the rangefeed.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()

var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand All @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, makeIntentScannerConstructor(rtsIter))
require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter)))
return p, stopper
}

Expand All @@ -165,13 +166,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
func newTestProcessor(
t *testing.T, rtsIter storage.SimpleMVCCIterator,
) (*Processor, *stop.Stopper) {
t.Helper()
return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */)
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Test processor without registrations.
Expand Down Expand Up @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) {
// to call on a nil Processor.
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
require.Panics(t, func() { p.Start(stopper, nil) })
require.Panics(t, func() { _ = p.Start(stopper, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) })
}

func TestProcessorSlowConsumer(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Add a registration.
Expand Down Expand Up @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
}, nil)
rtsIter.block = make(chan struct{})

p, stopper := newTestProcessor(rtsIter)
p, stopper := newTestProcessor(t, rtsIter)
defer stopper.Stop(context.Background())

// The resolved timestamp should not be initialized.
Expand Down Expand Up @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
})

p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp)
p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp)
defer stopper.Stop(context.Background())

// Add a few intents and move the closed timestamp forward.
Expand Down Expand Up @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
defer leaktest.AfterTest(t)()
const trials = 10
for i := 0; i < trials; i++ {
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)

var wg sync.WaitGroup
wg.Add(6)
Expand Down Expand Up @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
// observes only operations that are consumed after it has registered.
func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

firstC := make(chan int64)
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
return rangefeed.NewLegacyIntentScanner(iter)
}

p.Start(r.store.Stopper(), rtsIter)
// NB: This only errors if the stopper is stopping, and we have to return here
// in that case. We do check ShouldQuiesce() below, but that's not sufficient
// because the stopper has two states: stopping and quiescing. If this errors
// due to stopping, but before it enters the quiescing state, then the select
// below will fall through to the panic.
if err := p.Start(r.store.Stopper(), rtsIter); err != nil {
errC <- roachpb.NewError(err)
return nil
}

// Register with the processor *before* we attach its reference to the
// Replica struct. This ensures that the registration is in place before
Expand Down

0 comments on commit 3a389da

Please sign in to comment.