Skip to content

Commit

Permalink
backuppcl: pass finishesSpec field to resume span
Browse files Browse the repository at this point in the history
PR cockroachdb#114268 broke the plumbing for the CompletedSpans metric which allows the
backup coordinator to update the FractionCompleted metric. This patch fixes
this bug by passing the finishesSpec field to the resume span.

Informs cockroachdb#120161

Release note: none
  • Loading branch information
msbutler committed Mar 11, 2024
1 parent f84d66b commit e1e195b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
13 changes: 7 additions & 6 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,13 @@ func runBackupProcessor(
resumeTS = resp.Files[fileCount-1].EndKeyTS
}
resumeSpan = spanAndTime{
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
finishesSpec: span.finishesSpec,
}
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func registerBackup(r registry.Registry) {
Suites: registry.Suites(registry.Nightly),
EncryptionSupport: registry.EncryptionAlwaysDisabled,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
rows := rows2TiB
rows := rows100GiB
if c.IsLocal() {
rows = 100
}
Expand All @@ -339,8 +339,16 @@ func registerBackup(r registry.Registry) {
// total elapsed time. This is used by roachperf to compute and display
// the average MB/sec per node.
tick()
c.Run(ctx, option.WithNodes(c.Node(1)), `./cockroach sql --url={pgurl:1} -e "
BACKUP bank.bank TO 'gs://`+backupTestingBucket+`/`+dest+`?AUTH=implicit'"`)
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
var jobID jobspb.JobID
uri := `gs://` + backupTestingBucket + `/` + dest + `?AUTH=implicit`
if err := conn.QueryRowContext(ctx, fmt.Sprintf("BACKUP bank.bank INTO '%s' WITH detached", uri)).Scan(&jobID); err != nil {
return err
}
if err := AssertReasonableFractionCompleted(ctx, t.L(), c, jobID, 2); err != nil {
return err
}
tick()

// Upload the perf artifacts to any one of the nodes so that the test
Expand Down
57 changes: 56 additions & 1 deletion pkg/cmd/roachtest/tests/jobs_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func executeNodeShutdown(
for {
select {
case <-ticker.C:
err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status)
err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS $1]`, jobID).Scan(&status)
if err != nil {
return errors.Wrap(err, "getting the job status")
}
Expand Down Expand Up @@ -197,3 +197,58 @@ func getJobProgress(t test.Test, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jo
}
return ret
}

func AssertReasonableFractionCompleted(
ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int,
) error {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
fractionsRecorded := make([]float64, 0)

for {
select {
case <-ticker.C:
fractionCompleted, err := getFractionProgressed(ctx, l, c, jobID, nodeToQuery)
if err != nil {
return err
}
fractionsRecorded = append(fractionsRecorded, fractionCompleted)
if fractionCompleted == 1 {
count := len(fractionsRecorded)
if count > 5 && fractionsRecorded[count/2] < 0.2 && fractionsRecorded[count/2] > 0.8 {
return errors.Newf("the median fraction completed was %.2f, which is outside (0.2,0.8)", fractionsRecorded[count/2])
}
l.Printf("not enought fractions recorded to assert progress looks sane")
return nil
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish")
}
}
}

func getFractionProgressed(
ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int,
) (float64, error) {
var status string
var fractionCompleted float64
conn := c.Conn(ctx, l, nodeToQuery)
defer conn.Close()
err := conn.QueryRowContext(ctx, `SELECT status, fraction_completed FROM [SHOW JOB $1]`, jobID).Scan(&status, &fractionCompleted)
if err != nil {
return 0, errors.Wrap(err, "getting the job status and fraction completed")
}
jobStatus := jobs.Status(status)
switch jobStatus {
case jobs.StatusSucceeded:
if fractionCompleted != 1 {
return 0, errors.Newf("job completed but fraction completed is %.2f", fractionCompleted)
}
return fractionCompleted, nil
case jobs.StatusRunning:
l.Printf("job %d still running, %.2f completed, waiting to succeed", jobID, fractionCompleted)
return fractionCompleted, nil
default:
return 0, errors.Newf("unexpectedly found job %s in state %s", jobID, status)
}
}

0 comments on commit e1e195b

Please sign in to comment.