Skip to content

Commit

Permalink
backupccl: deflake TestBackupRestoreSystemJobProgress
Browse files Browse the repository at this point in the history
In the case of backup we only update the jobs fraction
progressed if we have exported a complete span. This
change adds some logic to wait until atleast one complete
span has been exported, before checking for a progress update.

Release note: None
  • Loading branch information
adityamaru committed Aug 16, 2021
1 parent ee3efd6 commit 4dcd54c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func runBackupProcessor(

if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if backupKnobs.RunAfterExportingSpanEntry != nil {
backupKnobs.RunAfterExportingSpanEntry(ctx)
backupKnobs.RunAfterExportingSpanEntry(ctx, res)
}
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,23 +1482,31 @@ func checkInProgressBackupRestore(
t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker,
) {
var allowResponse chan struct{}
var exportSpanCompleteCh chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterExportingSpanEntry: func(_ context.Context) {
RunAfterExportingSpanEntry: func(_ context.Context, res *roachpb.ExportResponse) {
<-allowResponse
// If ResumeSpan is set to nil, it means that we have completed
// exporting a span and the job will update its fraction progressed.
if res.ResumeSpan == nil {
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
},
}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
params.ServerArgs.Knobs = knobs

const numAccounts = 1000
const numAccounts = 100

ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, InitManualReplication, params)
ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanup()

Expand All @@ -1508,6 +1516,21 @@ func checkInProgressBackupRestore(
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`)

// Ensure that each node has at least one leaseholder. (These splits were
// made in BackupRestoreTestSetup.) These are wrapped with SucceedsSoon()
// because EXPERIMENTAL_RELOCATE can fail if there are other replication
// changes happening.
for _, stmt := range []string{
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`,
} {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(ctx, stmt)
return err
})
}

var totalExpectedBackupRequests int
// mergedRangeQuery calculates the number of spans we expect PartitionSpans to
// produce. It merges contiguous ranges on the same node.
Expand Down Expand Up @@ -1543,6 +1566,7 @@ WHERE

var totalExpectedResponses int
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh = make(chan struct{})
// totalExpectedBackupRequests takes into account the merging that backup
// does of co-located ranges. It is the expected number of ExportRequests
// backup issues. DistSender will still split those requests to different
Expand Down Expand Up @@ -1570,6 +1594,12 @@ WHERE
allowResponse <- struct{}{}
}

// Due to ExportRequest pagination, in the case of backup, we want to wait
// until an entire span has been exported before checking job progress.
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh <- struct{}{}
close(exportSpanCompleteCh)
}
err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ type BackupRestoreTestingKnobs struct {

// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context)
RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse)
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down

0 comments on commit 4dcd54c

Please sign in to comment.