From 81d1fe8b09c95cfc105928cb4a78f976d484e999 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Thu, 21 Mar 2024 01:30:27 -0400 Subject: [PATCH] changefeedccl: deflake TestAlterChangefeedAddTargetsDuringBackfill This patch deflakes `TestAlterChangefeedAddTargetsDuringBackfill` by increasing the max batch size used for changefeed initial scans. Previously, if we were unlucky, the batch sizes could be too small leading to a timeout while waiting for the initial scan to complete. Release note: None --- .../changefeedccl/alter_changefeed_test.go | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index feaa5298d90e..2d07ff901d97 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -12,6 +12,7 @@ import ( "context" gosql "database/sql" "fmt" + "math/rand" "net/url" "sync/atomic" "testing" @@ -1318,8 +1319,11 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rnd, _ := randutil.NewTestRand() - var rndMu syncutil.Mutex + var rndMu struct { + syncutil.Mutex + rnd *rand.Rand + } + rndMu.rnd, _ = randutil.NewTestRand() const maxCheckpointSize = 1 << 20 const numRowsPerTable = 1000 @@ -1340,11 +1344,19 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { Changefeed.(*TestingKnobs) // Ensure Scan Requests are always small enough that we receive multiple - // resolvedFoo events during a backfill + // resolvedFoo events during a backfill. + const maxBatchSize = numRowsPerTable / 5 knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error { rndMu.Lock() defer rndMu.Unlock() - b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(100) + // We don't want batch sizes that are too small because they could cause + // the initial scan to take too long, leading to the waitForHighwater + // call below to time out. The formula below is completely arbitrary and + // was chosen to ensure that the batch sizes aren't too small (i.e. in + // the 1-2 digit range) but are still small enough that at least a few + // batches will be necessary. + b.Header.MaxSpanRequestKeys = maxBatchSize/2 + rndMu.rnd.Int63n(maxBatchSize/2) + t.Logf("set max span request keys: %d", b.Header.MaxSpanRequestKeys) return nil } @@ -1365,7 +1377,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { return false, nil } if haveGaps { - return rnd.Intn(10) > 7, nil + return rndMu.rnd.Intn(10) > 7, nil } haveGaps = true return true, nil @@ -1415,13 +1427,12 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { var checkpoint roachpb.SpanGroup checkpoint.Add(jobCheckpoint.Spans...) - waitForJobStatus(sqlDB, t, jobFeed.JobID(), `paused`) - sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH initial_scan`, jobFeed.JobID())) // Collect spans we attempt to resolve after when we resume. var resolvedFoo []roachpb.Span knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) { + t.Logf("resolved span: %#v", r) if !r.Span.Equal(fooTableSpan) { resolvedFoo = append(resolvedFoo, r.Span) } @@ -1430,6 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { require.NoError(t, jobFeed.Resume()) + // Wait for highwater to be set, which signifies that the initial scan is complete. waitForHighwater(t, jobFeed, registry) // At this point, highwater mark should be set, and previous checkpoint should be gone.