Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109420: backupccl: replace TestBackupRestoreSystemJobsProgress r=dt a=dt

This replaces the complex TestBackupRestoreSystemJobsProgress test with a much simpler test that the number of chunk events sent to the progress logger is non-zero during the simple single backup test, TestBackupRestoreSingleNodeLocal.

Release note: none.
Epic: none.

Fixes cockroachdb#68571.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Aug 24, 2023
2 parents 2274ab3 + e36dba3 commit da1dc5e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 206 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ func backup(
// Signal that an ExportRequest finished to update job progress.
for i := int32(0); i < progDetails.CompletedSpans; i++ {
requestFinishedCh <- struct{}{}
if execCtx.ExecCfg().TestingKnobs.AfterBackupChunk != nil {
execCtx.ExecCfg().TestingKnobs.AfterBackupChunk()
}
}

// Update the per-component progress maintained by the job profiler.
Expand Down
223 changes: 17 additions & 206 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,26 @@ func TestBackupRestoreSingleNodeLocal(t *testing.T) {

const numAccounts = 1000
ctx := context.Background()
tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)

var chunks int
// Set the testing knob so we count each time we write a checkpoint.
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
AfterBackupChunk: func() {
chunks++
},
},
}
params.ServerArgs.Knobs = knobs

tc, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, nil)

// Verify that we sent >0 chunk events to the chunk progress logger.
require.Greater(t, chunks, 0)
}

func TestBackupRestoreMultiNodeLocal(t *testing.T) {
Expand Down Expand Up @@ -1427,179 +1443,6 @@ into_db='restoredb', %s)`, encryptionOption), backupLoc1)
}
}

type inProgressChecker func(context context.Context, ip inProgressState) error

// inProgressState holds state about an in-progress backup or restore
// for use in inProgressCheckers.
type inProgressState struct {
*gosql.DB
backupTableID uint32
dir, name string
}

func (ip inProgressState) latestJobID() (jobspb.JobID, error) {
var id jobspb.JobID
if err := ip.QueryRow(
`SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`,
).Scan(&id); err != nil {
return 0, err
}
return id, nil
}

// checkInProgressBackupRestore will run a backup and restore, pausing each
// approximately halfway through to run either `checkBackup` or `checkRestore`.
func checkInProgressBackupRestore(
t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker,
) {
var allowResponse chan struct{}
var exportSpanCompleteCh chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterExportingSpanEntry: func(_ context.Context, res *kvpb.ExportResponse) {
<-allowResponse
// If ResumeSpan is set to nil, it means that we have completed
// exporting a span and the job will update its fraction progressed.
if res.ResumeSpan == nil {
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
<-allowResponse
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
params.ServerArgs.Knobs = knobs

const numAccounts = 100

ctx := context.Background()
_, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, multiNode, numAccounts,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanup()

sqlDB.Exec(t, `CREATE DATABASE restoredb`)
// the small test-case will get entirely buffered/merged by small-file merging
// and not report any progress in the meantime unless it is disabled.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`)

// Ensure that each node has at least one leaseholder. (These splits were
// made in backupRestoreTestSetup.) These are wrapped with SucceedsSoon()
// because EXPERIMENTAL_RELOCATE can fail if there are other replication
// changes happening.
for _, stmt := range []string{
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`,
} {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(ctx, stmt)
return err
})
}

var totalExpectedBackupRequests int
// mergedRangeQuery calculates the number of spans we expect PartitionSpans to
// produce. It merges contiguous ranges on the same node.
// It sorts ranges by start_key and counts the number of times the
// lease_holder changes by comparing against the previous row's lease_holder.
mergedRangeQuery := `
WITH
ranges
AS (
SELECT
start_key,
lag(lease_holder) OVER (ORDER BY start_key)
AS prev_lease_holder,
lease_holder
FROM
[SHOW RANGES FROM TABLE data.bank WITH DETAILS]
)
SELECT
count(*)
FROM
ranges
WHERE
lease_holder != prev_lease_holder
OR prev_lease_holder IS NULL;
`

sqlDB.QueryRow(t, mergedRangeQuery).Scan(&totalExpectedBackupRequests)

backupTableID := sqlutils.QueryTableID(t, conn, "data", "public", "bank")

do := func(query string, check inProgressChecker) {
t.Logf("checking query %q", query)

var totalExpectedResponses int
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh = make(chan struct{})
// totalExpectedBackupRequests takes into account the merging that backup
// does of co-located ranges. It is the expected number of ExportRequests
// backup issues. DistSender will still split those requests to different
// ranges on the same node. Each range will write a file, so the number of
// SST files this backup will write is `backupRestoreDefaultRanges` .
totalExpectedResponses = totalExpectedBackupRequests
} else if strings.Contains(query, "RESTORE") {
// We expect restore to process each file in the backup individually.
// SST files are written per-range in the backup. So we expect the
// restore to process #(ranges) that made up the original table.
totalExpectedResponses = backupRestoreDefaultRanges
} else {
t.Fatal("expected query to be either a backup or restore")
}
jobDone := make(chan error)
allowResponse = make(chan struct{}, totalExpectedResponses)

go func() {
_, err := conn.Exec(query, localFoo)
jobDone <- err
}()

// Allow half the total expected responses to proceed.
for i := 0; i < totalExpectedResponses/2; i++ {
allowResponse <- struct{}{}
}

// Due to ExportRequest pagination, in the case of backup, we want to wait
// until an entire span has been exported before checking job progress.
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh <- struct{}{}
close(exportSpanCompleteCh)
}
err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
backupTableID: backupTableID,
dir: dir,
name: "foo",
})
})

// Close the channel to allow all remaining responses to proceed. We do this
// even if the above retry.ForDuration failed, otherwise the test will hang
// forever.
close(allowResponse)

if err := <-jobDone; err != nil {
t.Fatalf("%q: %+v", query, err)
}

if err != nil {
t.Fatal(err)
}
}

do(`BACKUP DATABASE data INTO $1`, checkBackup)
do(`RESTORE data.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb')`, checkRestore)
}

// TestRestoreCheckpointing checks that progress persists to the job record
// using the new span frontier. The test takes the following approach:
//
Expand Down Expand Up @@ -1717,38 +1560,6 @@ func TestRestoreCheckpointing(t *testing.T) {
require.Equal(t, totalEntries-entriesBeforePause, postResumeCount)
}

func TestBackupRestoreSystemJobsProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68571, "flaky test")
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

skip.UnderStressRace(t, "test takes too long to run under stressrace")

checkFraction := func(ctx context.Context, ip inProgressState) error {
jobID, err := ip.latestJobID()
if err != nil {
return err
}
var fractionCompleted float32
if err := ip.QueryRow(
`SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
jobID,
).Scan(&fractionCompleted); err != nil {
return err
}
if fractionCompleted < 0.01 || fractionCompleted > 0.99 {
return errors.Errorf(
"expected progress to be in range [0.01, 0.99] but got %f",
fractionCompleted,
)
}
return nil
}

checkInProgressBackupRestore(t, checkFraction, checkFraction)
}

func createAndWaitForJob(
t *testing.T,
db *sqlutils.SQLRunner,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,9 @@ type ExecutorTestingKnobs struct {
txnFingerprintID appstatspb.TransactionFingerprintID,
)

// AfterBackupChunk is called after each chunk of a backup is completed.
AfterBackupChunk func()

// AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT
// is written.
AfterBackupCheckpoint func()
Expand Down

0 comments on commit da1dc5e

Please sign in to comment.