Skip to content

Commit

Permalink
rangefeed: cancel async tasks on processor stop
Browse files Browse the repository at this point in the history
Previously, async tasks spawned by the rangefeed processor (typically
txn pushes and resolved timestamp scans) were not cancelled when the
processor was stopped or the stopper quiesced. If these operations
stalled, this could lead to goroutine leaks and node shutdown stalls.
However, this was mitigated to some extent by the intent resolver itself
detecting stopper quiescence.

This patch uses a separate task context for async tasks, which is
cancelled either when the processor is stopped or the stopper quiesces.

In general, the rangefeed scheduler shutdown logic could use some
improvement, but this patch does not attempt a broader cleanup in the
interest of backportability.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 20, 2024
1 parent 302ef0c commit 2520ea7
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
62 changes: 61 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a TxnPusher that performs assertions during the first 3 uses.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
// The txns are not in a sorted order. Enforce one.
sort.Slice(txns, func(i, j int) bool {
return bytes.Compare(txns[i].Key, txns[j].Key) < 0
Expand Down Expand Up @@ -1640,3 +1642,61 @@ func TestProcessorBackpressure(t *testing.T) {
},
}, events[len(events)-1])
}

// TestProcessorContextCancellation tests that the processor cancels the
// contexts of async tasks when stopped. It does not, however, cancel the
// process() context -- it probably should, but this should arguably also be
// handled by the scheduler.
func TestProcessorContextCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Try stopping both via the stopper and via Processor.Stop().
testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) {

// Set up a transaction to push.
txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp
txnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS}

// Set up a transaction pusher that will block until the context cancels.
pushReadyC := make(chan struct{})
pushDoneC := make(chan struct{})

var pusher testTxnPusher
pusher.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
pushReadyC <- struct{}{}
<-ctx.Done()
close(pushDoneC)
return nil, ctx.Err()
})
pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
return nil
})

// Start a test processor.
p, h, stopper := newTestProcessor(t, withPusher(&pusher))
ctx := context.Background()
defer stopper.Stop(ctx)

// Add an intent and move the closed timestamp past it. This should trigger a
// txn push attempt, wait for that to happen.
p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta))
p.ForwardClosedTS(ctx, txnTS.Add(1, 0))
h.syncEventC()
h.triggerTxnPushUntilPushed(t, pushReadyC)

// Now, stop the processor, and wait for the txn pusher to exit.
if useStopper {
stopper.Stop(ctx)
} else {
p.Stop()
}
select {
case <-pushDoneC:
case <-time.After(3 * time.Second):
t.Fatal("txn pusher did not exit")
}
})
}
32 changes: 22 additions & 10 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,27 @@ type ScheduledProcessor struct {

// processCtx is the annotated background context used for process(). It is
// stored here to avoid reconstructing it on every call.
processCtx context.Context
processCtx context.Context
// taskCtx is the context used to spawn async tasks (e.g. the txn pusher),
// along with its cancel function which is called when the processor stops or
// the stopper quiesces. It is independent of processCtx, and constructed
// during Start().
//
// TODO(erikgrinaker): the context handling here should be cleaned up.
// processCtx should be passed in from the scheduler and propagate stopper
// quiescence, and the async tasks should probably be run on scheduler
// threads or at least a separate bounded worker pool. But this will do for
// now.
taskCtx context.Context
taskCancel func()

requestQueue chan request
eventC chan *event
// If true, processor is not processing data anymore and waiting for registrations
// to be complete.
stopping bool
stoppedC chan struct{}

// Processor startup runs background tasks to scan intents. If processor is
// stopped early, this task needs to be terminated to avoid resource waste.
startupCancel func()
// stopper passed by start that is used for firing up async work from scheduler.
stopper *stop.Stopper
txnPushActive bool
Expand Down Expand Up @@ -94,9 +104,9 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor {
func (p *ScheduledProcessor) Start(
stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor,
) error {
ctx := p.Config.AmbientContext.AnnotateCtx(context.Background())
ctx, p.startupCancel = context.WithCancel(ctx)
p.stopper = stopper
p.taskCtx, p.taskCancel = p.stopper.WithCancelOnQuiesce(
p.Config.AmbientContext.AnnotateCtx(context.Background()))

// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
Expand All @@ -112,13 +122,14 @@ func (p *ScheduledProcessor) Start(
initScan := newInitResolvedTSScan(p.Span, p, rtsIter)
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks during start.
if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil {
err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
initScan.Cancel()
p.scheduler.StopProcessor()
return err
}
} else {
p.initResolvedTS(ctx)
p.initResolvedTS(p.taskCtx)
}

p.Metrics.RangeFeedProcessorsScheduler.Inc(1)
Expand Down Expand Up @@ -203,7 +214,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) {
p.txnPushActive = true
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks for push.
err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
err := p.stopper.RunAsyncTask(p.taskCtx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
}
Expand Down Expand Up @@ -231,7 +242,7 @@ func (p *ScheduledProcessor) cleanup() {
// Unregister callback from scheduler
p.scheduler.Unregister()

p.startupCancel()
p.taskCancel()
close(p.stoppedC)
p.MemBudget.Close(context.Background())
}
Expand Down Expand Up @@ -343,6 +354,7 @@ func (p *ScheduledProcessor) Register(
}
}
}
// NB: use ctx, not p.taskCtx, as the registry handles teardown itself.
if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
// If we can't schedule internally, processor is already stopped which
// could only happen on shutdown. Disconnect stream and just remove
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (s *initResolvedTSScan) Run(ctx context.Context) {
defer s.Cancel()
if err := s.iterateAndConsume(ctx); err != nil {
err = errors.Wrap(err, "initial resolved timestamp scan failed")
log.Errorf(ctx, "%v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "%v", err)
}
s.p.StopWithErr(kvpb.NewError(err))
} else {
// Inform the processor that its resolved timestamp can be initialized.
Expand Down Expand Up @@ -238,7 +240,9 @@ func newTxnPushAttempt(
func (a *txnPushAttempt) Run(ctx context.Context) {
defer a.Cancel()
if err := a.pushOldTxns(ctx); err != nil {
log.Errorf(ctx, "pushing old intents failed: %v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "pushing old intents failed: %v", err)
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,22 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
return tp.pushTxnsFn(txns, ts)
return tp.pushTxnsFn(ctx, txns, ts)
}

func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
tp.pushTxnsFn = fn
}
Expand Down Expand Up @@ -432,7 +432,9 @@ func TestTxnPushAttempt(t *testing.T) {

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
Expand Down

0 comments on commit 2520ea7

Please sign in to comment.