diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index ce74da48f84d..2390fe4e52aa 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -636,12 +636,13 @@ func runBackupProcessor( resumeTS = resp.Files[fileCount-1].EndKeyTS } resumeSpan = spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + finishesSpec: span.finishesSpec, } } diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 55720bbe84ba..1916ca5ecaf0 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -325,7 +325,7 @@ func registerBackup(r registry.Registry) { Suites: registry.Suites(registry.Nightly), EncryptionSupport: registry.EncryptionAlwaysDisabled, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - rows := rows2TiB + rows := rows100GiB if c.IsLocal() { rows = 100 } @@ -339,8 +339,16 @@ func registerBackup(r registry.Registry) { // total elapsed time. This is used by roachperf to compute and display // the average MB/sec per node. tick() - c.Run(ctx, option.WithNodes(c.Node(1)), `./cockroach sql --url={pgurl:1} -e " - BACKUP bank.bank TO 'gs://`+backupTestingBucket+`/`+dest+`?AUTH=implicit'"`) + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + var jobID jobspb.JobID + uri := `gs://` + backupTestingBucket + `/` + dest + `?AUTH=implicit` + if err := conn.QueryRowContext(ctx, fmt.Sprintf("BACKUP bank.bank INTO '%s' WITH detached", uri)).Scan(&jobID); err != nil { + return err + } + if err := AssertReasonableFractionCompleted(ctx, t.L(), c, jobID, 2); err != nil { + return err + } tick() // Upload the perf artifacts to any one of the nodes so that the test diff --git a/pkg/cmd/roachtest/tests/jobs_util.go b/pkg/cmd/roachtest/tests/jobs_util.go index 6aeab961b26f..8e8f035c7ef8 100644 --- a/pkg/cmd/roachtest/tests/jobs_util.go +++ b/pkg/cmd/roachtest/tests/jobs_util.go @@ -115,7 +115,7 @@ func executeNodeShutdown( for { select { case <-ticker.C: - err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS $1]`, jobID).Scan(&status) if err != nil { return errors.Wrap(err, "getting the job status") } @@ -197,3 +197,58 @@ func getJobProgress(t test.Test, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jo } return ret } + +func AssertReasonableFractionCompleted( + ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int, +) error { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + fractionsRecorded := make([]float64, 0) + + for { + select { + case <-ticker.C: + fractionCompleted, err := getFractionProgressed(ctx, l, c, jobID, nodeToQuery) + if err != nil { + return err + } + fractionsRecorded = append(fractionsRecorded, fractionCompleted) + if fractionCompleted == 1 { + count := len(fractionsRecorded) + if count > 5 && fractionsRecorded[count/2] < 0.2 && fractionsRecorded[count/2] > 0.8 { + return errors.Newf("the median fraction completed was %.2f, which is outside (0.2,0.8)", fractionsRecorded[count/2]) + } + l.Printf("not enought fractions recorded to assert progress looks sane") + return nil + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish") + } + } +} + +func getFractionProgressed( + ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int, +) (float64, error) { + var status string + var fractionCompleted float64 + conn := c.Conn(ctx, l, nodeToQuery) + defer conn.Close() + err := conn.QueryRowContext(ctx, `SELECT status, fraction_completed FROM [SHOW JOB $1]`, jobID).Scan(&status, &fractionCompleted) + if err != nil { + return 0, errors.Wrap(err, "getting the job status and fraction completed") + } + jobStatus := jobs.Status(status) + switch jobStatus { + case jobs.StatusSucceeded: + if fractionCompleted != 1 { + return 0, errors.Newf("job completed but fraction completed is %.2f", fractionCompleted) + } + return fractionCompleted, nil + case jobs.StatusRunning: + l.Printf("job %d still running, %.2f completed, waiting to succeed", jobID, fractionCompleted) + return fractionCompleted, nil + default: + return 0, errors.Newf("unexpectedly found job %s in state %s", jobID, status) + } +}