From e0ee2403885fef8355db0f2d9775c01fc2594b1d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sun, 10 Mar 2024 09:18:04 -0600 Subject: [PATCH] backuppcl: pass finishesSpec field to resume span PR #114268 broke the plumbing for the CompletedSpans metric which allows the backup coordinator to update the FractionCompleted metric. This patch fixes this bug by passing the finishesSpec field to the resume span. Informs #120161 Release note: none --- pkg/ccl/backupccl/backup_processor.go | 13 ++++--- pkg/cmd/roachtest/tests/backup.go | 12 +++++- pkg/cmd/roachtest/tests/jobs.go | 55 +++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index aff3cfd939f7..7acea4f47a1f 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -623,12 +623,13 @@ func runBackupProcessor( resumeTS = resp.Files[fileCount-1].EndKeyTS } resumeSpan = spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + finishesSpec: span.finishesSpec, } } diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 35bef6fb95e8..571eefc16f7c 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -339,8 +339,16 @@ func registerBackup(r registry.Registry) { // total elapsed time. This is used by roachperf to compute and display // the average MB/sec per node. tick() - c.Run(ctx, c.Node(1), `./cockroach sql --url={pgurl:1} -e " - BACKUP bank.bank TO 'gs://`+backupTestingBucket+`/`+dest+`?AUTH=implicit'"`) + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + var jobID jobspb.JobID + uri := `gs://` + backupTestingBucket + `/` + dest + `?AUTH=implicit` + if err := conn.QueryRowContext(ctx, fmt.Sprintf("BACKUP bank.bank INTO '%s' WITH detached", uri)).Scan(&jobID); err != nil { + return err + } + if err := AssertReasonableFractionCompleted(ctx, t.L(), c, jobID, 2); err != nil { + return err + } tick() // Upload the perf artifacts to any one of the nodes so that the test diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index 38cd80001b2d..e4c997be6386 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -197,3 +197,58 @@ func getJobProgress(t test.Test, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jo } return ret } + +func AssertReasonableFractionCompleted( + ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int, +) error { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + fractionsRecorded := make([]float64, 0) + + for { + select { + case <-ticker.C: + fractionCompleted, err := getFractionProgressed(ctx, l, c, jobID, nodeToQuery) + if err != nil { + return err + } + fractionsRecorded = append(fractionsRecorded, fractionCompleted) + if fractionCompleted == 1 { + count := len(fractionsRecorded) + if count > 5 && fractionsRecorded[count/2] < 0.2 && fractionsRecorded[count/2] > 0.8 { + return errors.Newf("the median fraction completed was %.2f, which is outside (0.2,0.8)", fractionsRecorded[count/2]) + } + l.Printf("not enough 'fractionCompleted' recorded to assert progress looks sane") + return nil + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish") + } + } +} + +func getFractionProgressed( + ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int, +) (float64, error) { + var status string + var fractionCompleted float64 + conn := c.Conn(ctx, l, nodeToQuery) + defer conn.Close() + err := conn.QueryRowContext(ctx, `SELECT status, fraction_completed FROM [SHOW JOB $1]`, jobID).Scan(&status, &fractionCompleted) + if err != nil { + return 0, errors.Wrap(err, "getting the job status and fraction completed") + } + jobStatus := jobs.Status(status) + switch jobStatus { + case jobs.StatusSucceeded: + if fractionCompleted != 1 { + return 0, errors.Newf("job completed but fraction completed is %.2f", fractionCompleted) + } + return fractionCompleted, nil + case jobs.StatusRunning: + l.Printf("job %d still running, %.2f completed, waiting to succeed", jobID, fractionCompleted) + return fractionCompleted, nil + default: + return 0, errors.Newf("unexpectedly found job %s in state %s", jobID, status) + } +}