From e9e213658859f5420da160cafdcf7f53dae4a6a2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 11:15:13 +0000 Subject: [PATCH 1/2] rangefeed: tweak `triggerTxnPushUntilPushed` test helper Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 43 +++++++++++---------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 9e78496d73d1..9238d0f6a371 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -191,25 +191,26 @@ func (h *processorTestHelper) syncEventAndRegistrationsSpan(span roachpb.Span) { h.sendSpanSync(&span) } -func (h *processorTestHelper) triggerTxnPushUntilPushed( - t *testing.T, ackC chan struct{}, timeout time.Duration, -) { - if h.scheduler != nil { - deadline := time.After(timeout) - for { +// triggerTxnPushUntilPushed will schedule PushTxnQueued events until pushedC +// indicates that a transaction push attempt has started by posting an event. +// If a push does not happen in 10 seconds, the attempt fails. +func (h *processorTestHelper) triggerTxnPushUntilPushed(t *testing.T, pushedC <-chan struct{}) { + timeoutC := time.After(10 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + if h.scheduler != nil { h.scheduler.Enqueue(PushTxnQueued) - select { - case <-deadline: - t.Fatal("failed to get txn push notification") - case ackC <- struct{}{}: - return - case <-time.After(100 * time.Millisecond): - // We keep sending events to avoid the situation where event arrives - // but flag indicating that push is still running is not reset. - } } - } else { - ackC <- struct{}{} + select { + case <-pushedC: + return + case <-ticker.C: + // We keep sending events to avoid the situation where event arrives + // but flag indicating that push is still running is not reset. + case <-timeoutC: + t.Fatal("failed to get txn push notification") + } } } @@ -1134,7 +1135,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil } - <-pausePushAttemptsC + pausePushAttemptsC <- struct{}{} <-resumePushAttemptsC return nil }) @@ -1155,7 +1156,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) // Wait for the first txn push attempt to complete. - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp hasn't moved. h.syncEventC() @@ -1169,7 +1170,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Unblock the second txn push attempt and wait for it to complete. resumePushAttemptsC <- struct{}{} - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp should have moved forwards to the closed // timestamp. @@ -1193,7 +1194,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Unblock the third txn push attempt and wait for it to complete. resumePushAttemptsC <- struct{}{} - h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC) // The resolved timestamp should have moved forwards to the closed // timestamp. From de2a001dc3347609eac0a19a40fcd208bc14ebc6 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 11:50:37 +0000 Subject: [PATCH 2/2] rangefeed: cancel async tasks on processor stop Previously, async tasks spawned by the rangefeed processor (typically txn pushes and resolved timestamp scans) were not cancelled when the processor was stopped or the stopper quiesced. If these operations stalled, this could lead to goroutine leaks and node shutdown stalls. However, this was mitigated to some extent by the intent resolver itself detecting stopper quiescence. This patch uses a separate task context for async tasks, which is cancelled either when the processor is stopped or the stopper quiesces. In general, the rangefeed scheduler shutdown logic could use some improvement, but this patch does not attempt a broader cleanup in the interest of backportability. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 68 ++++++++++++++++++- .../kvserver/rangefeed/scheduled_processor.go | 32 ++++++--- pkg/kv/kvserver/rangefeed/task.go | 8 ++- pkg/kv/kvserver/rangefeed/task_test.go | 10 +-- 4 files changed, 100 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 9238d0f6a371..27932651be93 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1082,7 +1082,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Create a TxnPusher that performs assertions during the first 3 uses. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -1247,7 +1249,9 @@ func TestProcessorTxnPushDisabled(t *testing.T) { // a new test when the legacy processor is removed and the scheduled processor // is used by default. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts) t.Errorf("%v", err) return nil, err @@ -1798,3 +1802,63 @@ func TestProcessorBackpressure(t *testing.T) { }, }, events[len(events)-1]) } + +// TestProcessorContextCancellation tests that the processor cancels the +// contexts of async tasks when stopped. It does not, however, cancel the +// process() context -- it probably should, but this should arguably also be +// handled by the scheduler. +func TestProcessorContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + + testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { + // Try stopping both via the stopper and via Processor.Stop(). + testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) { + + // Set up a transaction to push. + txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp + txnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS} + + // Set up a transaction pusher that will block until the context cancels. + pushReadyC := make(chan struct{}) + pushDoneC := make(chan struct{}) + + var pusher testTxnPusher + pusher.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { + pushReadyC <- struct{}{} + <-ctx.Done() + close(pushDoneC) + return nil, ctx.Err() + }) + pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + return nil + }) + + // Start a test processor. + p, h, stopper := newTestProcessor(t, withPusher(&pusher), withProcType(pt)) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add an intent and move the closed timestamp past it. This should trigger a + // txn push attempt, wait for that to happen. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta)) + p.ForwardClosedTS(ctx, txnTS.Add(1, 0)) + h.syncEventC() + h.triggerTxnPushUntilPushed(t, pushReadyC) + + // Now, stop the processor, and wait for the txn pusher to exit. + if useStopper { + stopper.Stop(ctx) + } else { + p.Stop() + } + select { + case <-pushDoneC: + case <-time.After(3 * time.Second): + t.Fatal("txn pusher did not exit") + } + }) + }) +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index c0fa5aefe98c..cc6d6f81408e 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -47,7 +47,20 @@ type ScheduledProcessor struct { // processCtx is the annotated background context used for process(). It is // stored here to avoid reconstructing it on every call. - processCtx context.Context + processCtx context.Context + // taskCtx is the context used to spawn async tasks (e.g. the txn pusher), + // along with its cancel function which is called when the processor stops or + // the stopper quiesces. It is independent of processCtx, and constructed + // during Start(). + // + // TODO(erikgrinaker): the context handling here should be cleaned up. + // processCtx should be passed in from the scheduler and propagate stopper + // quiescence, and the async tasks should probably be run on scheduler + // threads or at least a separate bounded worker pool. But this will do for + // now. + taskCtx context.Context + taskCancel func() + requestQueue chan request eventC chan *event // If true, processor is not processing data anymore and waiting for registrations @@ -55,9 +68,6 @@ type ScheduledProcessor struct { stopping bool stoppedC chan struct{} - // Processor startup runs background tasks to scan intents. If processor is - // stopped early, this task needs to be terminated to avoid resource waste. - startupCancel func() // stopper passed by start that is used for firing up async work from scheduler. stopper *stop.Stopper txnPushActive bool @@ -94,9 +104,9 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor { func (p *ScheduledProcessor) Start( stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor, ) error { - ctx := p.Config.AmbientContext.AnnotateCtx(context.Background()) - ctx, p.startupCancel = context.WithCancel(ctx) p.stopper = stopper + p.taskCtx, p.taskCancel = p.stopper.WithCancelOnQuiesce( + p.Config.AmbientContext.AnnotateCtx(context.Background())) // Note that callback registration must be performed before starting resolved // timestamp init because resolution posts resolvedTS event when it is done. @@ -112,13 +122,14 @@ func (p *ScheduledProcessor) Start( initScan := newInitResolvedTSScan(p.Span, p, rtsIter) // TODO(oleg): we need to cap number of tasks that we can fire up across // all feeds as they could potentially generate O(n) tasks during start. - if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil { + err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run) + if err != nil { initScan.Cancel() p.scheduler.StopProcessor() return err } } else { - p.initResolvedTS(ctx) + p.initResolvedTS(p.taskCtx) } p.Metrics.RangeFeedProcessorsScheduler.Inc(1) @@ -203,7 +214,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) { p.txnPushActive = true // TODO(oleg): we need to cap number of tasks that we can fire up across // all feeds as they could potentially generate O(n) tasks for push. - err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run) + err := p.stopper.RunAsyncTask(p.taskCtx, "rangefeed: pushing old txns", pushTxns.Run) if err != nil { pushTxns.Cancel() } @@ -231,7 +242,7 @@ func (p *ScheduledProcessor) cleanup() { // Unregister callback from scheduler p.scheduler.Unregister() - p.startupCancel() + p.taskCancel() close(p.stoppedC) p.MemBudget.Close(context.Background()) } @@ -343,6 +354,7 @@ func (p *ScheduledProcessor) Register( } } } + // NB: use ctx, not p.taskCtx, as the registry handles teardown itself. if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { // If we can't schedule internally, processor is already stopped which // could only happen on shutdown. Disconnect stream and just remove diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index b40254540fb2..9da26eb3ca32 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -61,7 +61,9 @@ func (s *initResolvedTSScan) Run(ctx context.Context) { defer s.Cancel() if err := s.iterateAndConsume(ctx); err != nil { err = errors.Wrap(err, "initial resolved timestamp scan failed") - log.Errorf(ctx, "%v", err) + if ctx.Err() == nil { // cancellation probably caused the error + log.Errorf(ctx, "%v", err) + } s.p.StopWithErr(kvpb.NewError(err)) } else { // Inform the processor that its resolved timestamp can be initialized. @@ -227,7 +229,9 @@ func newTxnPushAttempt( func (a *txnPushAttempt) Run(ctx context.Context) { defer a.Cancel() if err := a.pushOldTxns(ctx); err != nil { - log.Errorf(ctx, "pushing old intents failed: %v", err) + if ctx.Err() == nil { // cancellation probably caused the error + log.Errorf(ctx, "pushing old intents failed: %v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 9a80bb010fe9..13eeecd1575a 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -358,14 +358,14 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]*roachpb.Transaction, error) { - return tp.pushTxnsFn(txns, ts) + return tp.pushTxnsFn(ctx, txns, ts) } func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -373,7 +373,7 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L } func (tp *testTxnPusher) mockPushTxns( - fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), ) { tp.pushTxnsFn = fn } @@ -430,7 +430,9 @@ func TestTxnPushAttempt(t *testing.T) { // Run a txnPushAttempt. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1])