Skip to content

Commit

Permalink
changefeedccl: deflake TestAlterChangefeedAddTargetsDuringBackfill
Browse files Browse the repository at this point in the history
This patch deflakes `TestAlterChangefeedAddTargetsDuringBackfill`
by increasing the max batch size used for changefeed initial scans.
Previously, if we were unlucky, the batch sizes could be too small
leading to a timeout while waiting for the initial scan to complete.

Release note: None
  • Loading branch information
andyyang890 committed Mar 25, 2024
1 parent db5b8a0 commit 81d1fe8
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"net/url"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1318,8 +1319,11 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rnd, _ := randutil.NewTestRand()
var rndMu syncutil.Mutex
var rndMu struct {
syncutil.Mutex
rnd *rand.Rand
}
rndMu.rnd, _ = randutil.NewTestRand()
const maxCheckpointSize = 1 << 20
const numRowsPerTable = 1000

Expand All @@ -1340,11 +1344,19 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
Changefeed.(*TestingKnobs)

// Ensure Scan Requests are always small enough that we receive multiple
// resolvedFoo events during a backfill
// resolvedFoo events during a backfill.
const maxBatchSize = numRowsPerTable / 5
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
rndMu.Lock()
defer rndMu.Unlock()
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(100)
// We don't want batch sizes that are too small because they could cause
// the initial scan to take too long, leading to the waitForHighwater
// call below to time out. The formula below is completely arbitrary and
// was chosen to ensure that the batch sizes aren't too small (i.e. in
// the 1-2 digit range) but are still small enough that at least a few
// batches will be necessary.
b.Header.MaxSpanRequestKeys = maxBatchSize/2 + rndMu.rnd.Int63n(maxBatchSize/2)
t.Logf("set max span request keys: %d", b.Header.MaxSpanRequestKeys)
return nil
}

Expand All @@ -1365,7 +1377,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
return false, nil
}
if haveGaps {
return rnd.Intn(10) > 7, nil
return rndMu.rnd.Intn(10) > 7, nil
}
haveGaps = true
return true, nil
Expand Down Expand Up @@ -1415,13 +1427,12 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
var checkpoint roachpb.SpanGroup
checkpoint.Add(jobCheckpoint.Spans...)

waitForJobStatus(sqlDB, t, jobFeed.JobID(), `paused`)

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH initial_scan`, jobFeed.JobID()))

// Collect spans we attempt to resolve after when we resume.
var resolvedFoo []roachpb.Span
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
t.Logf("resolved span: %#v", r)
if !r.Span.Equal(fooTableSpan) {
resolvedFoo = append(resolvedFoo, r.Span)
}
Expand All @@ -1430,6 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

require.NoError(t, jobFeed.Resume())

// Wait for highwater to be set, which signifies that the initial scan is complete.
waitForHighwater(t, jobFeed, registry)

// At this point, highwater mark should be set, and previous checkpoint should be gone.
Expand Down

0 comments on commit 81d1fe8

Please sign in to comment.