From 302ef0ce8dc8ff08b007f6efd2e87533248d3448 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 11:15:13 +0000 Subject: [PATCH 1/2] rangefeed: tweak `triggerTxnPushUntilPushed` test helper Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 27 ++++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 83a0cfb5f86e..fcc4d4058f39 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -191,20 +191,23 @@ func (h *processorTestHelper) syncEventAndRegistrationsSpan(span roachpb.Span) { h.sendSpanSync(&span) } -func (h *processorTestHelper) triggerTxnPushUntilPushed( - t *testing.T, ackC chan struct{}, timeout time.Duration, -) { - deadline := time.After(timeout) +// triggerTxnPushUntilPushed will schedule PushTxnQueued events until pushedC +// indicates that a transaction push attempt has started by posting an event. +// If a push does not happen in 10 seconds, the attempt fails. +func (h *processorTestHelper) triggerTxnPushUntilPushed(t *testing.T, pushedC <-chan struct{}) { + timeoutC := time.After(10 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() for { h.scheduler.Enqueue(PushTxnQueued) select { - case <-deadline: - t.Fatal("failed to get txn push notification") - case ackC <- struct{}{}: + case <-pushedC: return - case <-time.After(100 * time.Millisecond): + case <-ticker.C: // We keep sending events to avoid the situation where event arrives // but flag indicating that push is still running is not reset. + case <-timeoutC: + t.Fatal("failed to get txn push notification") } } } @@ -1048,7 +1051,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil } - <-pausePushAttemptsC + pausePushAttemptsC <- struct{}{} <-resumePushAttemptsC return nil }) @@ -1069,7 +1072,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) // Wait for the first txn push attempt to complete. - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp hasn't moved. h.syncEventC() @@ -1083,7 +1086,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Unblock the second txn push attempt and wait for it to complete. resumePushAttemptsC <- struct{}{} - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp should have moved forwards to the closed // timestamp. @@ -1107,7 +1110,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Unblock the third txn push attempt and wait for it to complete. resumePushAttemptsC <- struct{}{} - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp should have moved forwards to the closed // timestamp. From 2520ea7c42838a8d88df5d8f13d933aba0116cec Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 11:50:37 +0000 Subject: [PATCH 2/2] rangefeed: cancel async tasks on processor stop Previously, async tasks spawned by the rangefeed processor (typically txn pushes and resolved timestamp scans) were not cancelled when the processor was stopped or the stopper quiesced. If these operations stalled, this could lead to goroutine leaks and node shutdown stalls. However, this was mitigated to some extent by the intent resolver itself detecting stopper quiescence. This patch uses a separate task context for async tasks, which is cancelled either when the processor is stopped or the stopper quiesces. In general, the rangefeed scheduler shutdown logic could use some improvement, but this patch does not attempt a broader cleanup in the interest of backportability. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 62 ++++++++++++++++++- .../kvserver/rangefeed/scheduled_processor.go | 32 +++++++--- pkg/kv/kvserver/rangefeed/task.go | 8 ++- pkg/kv/kvserver/rangefeed/task_test.go | 10 +-- 4 files changed, 95 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index fcc4d4058f39..e23d82f57d70 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -998,7 +998,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Create a TxnPusher that performs assertions during the first 3 uses. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -1640,3 +1642,61 @@ func TestProcessorBackpressure(t *testing.T) { }, }, events[len(events)-1]) } + +// TestProcessorContextCancellation tests that the processor cancels the +// contexts of async tasks when stopped. It does not, however, cancel the +// process() context -- it probably should, but this should arguably also be +// handled by the scheduler. +func TestProcessorContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Try stopping both via the stopper and via Processor.Stop(). + testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) { + + // Set up a transaction to push. + txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp + txnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS} + + // Set up a transaction pusher that will block until the context cancels. + pushReadyC := make(chan struct{}) + pushDoneC := make(chan struct{}) + + var pusher testTxnPusher + pusher.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { + pushReadyC <- struct{}{} + <-ctx.Done() + close(pushDoneC) + return nil, ctx.Err() + }) + pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + return nil + }) + + // Start a test processor. + p, h, stopper := newTestProcessor(t, withPusher(&pusher)) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add an intent and move the closed timestamp past it. This should trigger a + // txn push attempt, wait for that to happen. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta)) + p.ForwardClosedTS(ctx, txnTS.Add(1, 0)) + h.syncEventC() + h.triggerTxnPushUntilPushed(t, pushReadyC) + + // Now, stop the processor, and wait for the txn pusher to exit. + if useStopper { + stopper.Stop(ctx) + } else { + p.Stop() + } + select { + case <-pushDoneC: + case <-time.After(3 * time.Second): + t.Fatal("txn pusher did not exit") + } + }) +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index c0fa5aefe98c..cc6d6f81408e 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -47,7 +47,20 @@ type ScheduledProcessor struct { // processCtx is the annotated background context used for process(). It is // stored here to avoid reconstructing it on every call. - processCtx context.Context + processCtx context.Context + // taskCtx is the context used to spawn async tasks (e.g. the txn pusher), + // along with its cancel function which is called when the processor stops or + // the stopper quiesces. It is independent of processCtx, and constructed + // during Start(). + // + // TODO(erikgrinaker): the context handling here should be cleaned up. + // processCtx should be passed in from the scheduler and propagate stopper + // quiescence, and the async tasks should probably be run on scheduler + // threads or at least a separate bounded worker pool. But this will do for + // now. + taskCtx context.Context + taskCancel func() + requestQueue chan request eventC chan *event // If true, processor is not processing data anymore and waiting for registrations @@ -55,9 +68,6 @@ type ScheduledProcessor struct { stopping bool stoppedC chan struct{} - // Processor startup runs background tasks to scan intents. If processor is - // stopped early, this task needs to be terminated to avoid resource waste. - startupCancel func() // stopper passed by start that is used for firing up async work from scheduler. stopper *stop.Stopper txnPushActive bool @@ -94,9 +104,9 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor { func (p *ScheduledProcessor) Start( stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor, ) error { - ctx := p.Config.AmbientContext.AnnotateCtx(context.Background()) - ctx, p.startupCancel = context.WithCancel(ctx) p.stopper = stopper + p.taskCtx, p.taskCancel = p.stopper.WithCancelOnQuiesce( + p.Config.AmbientContext.AnnotateCtx(context.Background())) // Note that callback registration must be performed before starting resolved // timestamp init because resolution posts resolvedTS event when it is done. @@ -112,13 +122,14 @@ func (p *ScheduledProcessor) Start( initScan := newInitResolvedTSScan(p.Span, p, rtsIter) // TODO(oleg): we need to cap number of tasks that we can fire up across // all feeds as they could potentially generate O(n) tasks during start. - if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil { + err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run) + if err != nil { initScan.Cancel() p.scheduler.StopProcessor() return err } } else { - p.initResolvedTS(ctx) + p.initResolvedTS(p.taskCtx) } p.Metrics.RangeFeedProcessorsScheduler.Inc(1) @@ -203,7 +214,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) { p.txnPushActive = true // TODO(oleg): we need to cap number of tasks that we can fire up across // all feeds as they could potentially generate O(n) tasks for push. - err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run) + err := p.stopper.RunAsyncTask(p.taskCtx, "rangefeed: pushing old txns", pushTxns.Run) if err != nil { pushTxns.Cancel() } @@ -231,7 +242,7 @@ func (p *ScheduledProcessor) cleanup() { // Unregister callback from scheduler p.scheduler.Unregister() - p.startupCancel() + p.taskCancel() close(p.stoppedC) p.MemBudget.Close(context.Background()) } @@ -343,6 +354,7 @@ func (p *ScheduledProcessor) Register( } } } + // NB: use ctx, not p.taskCtx, as the registry handles teardown itself. if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { // If we can't schedule internally, processor is already stopped which // could only happen on shutdown. Disconnect stream and just remove diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index c8da819b16b3..d5605d477664 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -61,7 +61,9 @@ func (s *initResolvedTSScan) Run(ctx context.Context) { defer s.Cancel() if err := s.iterateAndConsume(ctx); err != nil { err = errors.Wrap(err, "initial resolved timestamp scan failed") - log.Errorf(ctx, "%v", err) + if ctx.Err() == nil { // cancellation probably caused the error + log.Errorf(ctx, "%v", err) + } s.p.StopWithErr(kvpb.NewError(err)) } else { // Inform the processor that its resolved timestamp can be initialized. @@ -238,7 +240,9 @@ func newTxnPushAttempt( func (a *txnPushAttempt) Run(ctx context.Context) { defer a.Cancel() if err := a.pushOldTxns(ctx); err != nil { - log.Errorf(ctx, "pushing old intents failed: %v", err) + if ctx.Err() == nil { // cancellation probably caused the error + log.Errorf(ctx, "pushing old intents failed: %v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 873198f71dd1..0e2380db8a36 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -360,14 +360,14 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]*roachpb.Transaction, error) { - return tp.pushTxnsFn(txns, ts) + return tp.pushTxnsFn(ctx, txns, ts) } func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -375,7 +375,7 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L } func (tp *testTxnPusher) mockPushTxns( - fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), ) { tp.pushTxnsFn = fn } @@ -432,7 +432,9 @@ func TestTxnPushAttempt(t *testing.T) { // Run a txnPushAttempt. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1])