From e7428991c9e09e59b6d8b47f6e44af457ca49a9f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 29 Jan 2024 12:56:18 +0000 Subject: [PATCH] rangefeed: add `TestProcessorContextCancellation` This tests that async tasks spawned by the rangefeed processor are cancelled when the processor shuts down, via context cancellation. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 73 +++++++++++++++++++-- pkg/kv/kvserver/rangefeed/task_test.go | 10 +-- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8cae2f52e28b..955ee49eb59b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -67,6 +67,10 @@ func writeIntentOpWithDetails( }) } +func writeIntentOpFromMeta(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { + return writeIntentOpWithDetails(txn.ID, txn.Key, txn.MinTimestamp, txn.WriteTimestamp) +} + func writeIntentOpWithKey(txnID uuid.UUID, key []byte, ts hlc.Timestamp) enginepb.MVCCLogicalOp { return writeIntentOpWithDetails(txnID, key, ts /* minTS */, ts) } @@ -798,7 +802,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Create a TxnPusher that performs assertions during the first 3 uses. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -861,9 +867,6 @@ func TestProcessorTxnPushAttempt(t *testing.T) { defer stopper.Stop(ctx) // Add a few intents and move the closed timestamp forward. - writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { - return writeIntentOpWithDetails(txn.ID, txn.Key, txn.MinTimestamp, txn.WriteTimestamp) - } p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txn1Meta), writeIntentOpFromMeta(txn2Meta), @@ -1466,3 +1469,65 @@ func BenchmarkProcessorWithBudget(b *testing.B) { require.NoError(b, err.GoError()) } } + +// TestProcessorContextCancellation tests that the processor cancels the +// contexts of async tasks when stopped. It does not, however, cancel the +// process() context -- it probably should, but this should arguably also be +// handled by the scheduler. +func TestProcessorContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Try stopping both via the stopper and via Processor.Stop(). + testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) { + + // Set up a transaction to push. + txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp + txnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS} + + // Set up a transaction pusher that will block until the context cancels. + pushReadyC := make(chan struct{}) + pushDoneC := make(chan struct{}) + + var pusher testTxnPusher + pusher.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { + pushReadyC <- struct{}{} + <-ctx.Done() + close(pushDoneC) + return nil, ctx.Err() + }) + pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + return nil + }) + + // Start a test processor. + p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &pusher) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add an intent and move the closed timestamp past it. This should trigger a + // txn push attempt, wait for that to happen. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta)) + p.ForwardClosedTS(ctx, txnTS.Add(1, 0)) + p.syncEventC() + select { + case <-pushReadyC: + case <-time.After(3 * time.Second): + t.Fatal("txn push did not arrive") + } + + // Now, stop the processor, and wait for the txn pusher to exit. + if useStopper { + stopper.Stop(ctx) + } else { + p.Stop() + } + select { + case <-pushDoneC: + case <-time.After(3 * time.Second): + t.Fatal("txn pusher did not exit") + } + }) +} diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index ce8079a163f4..39b2ee45b017 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -359,14 +359,14 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]*roachpb.Transaction, error) { - return tp.pushTxnsFn(txns, ts) + return tp.pushTxnsFn(ctx, txns, ts) } func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -374,7 +374,7 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L } func (tp *testTxnPusher) mockPushTxns( - fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), ) { tp.pushTxnsFn = fn } @@ -431,7 +431,9 @@ func TestTxnPushAttempt(t *testing.T) { // Run a txnPushAttempt. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1])