Skip to content

Commit

Permalink
Merge pull request #120261 from msbutler/backport23.2-120204
Browse files Browse the repository at this point in the history
release-23.2: backuppcl: pass finishesSpec field to resume span
  • Loading branch information
msbutler authored Mar 12, 2024
2 parents fb70684 + e0ee240 commit 0c3f5d4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
13 changes: 7 additions & 6 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,12 +623,13 @@ func runBackupProcessor(
resumeTS = resp.Files[fileCount-1].EndKeyTS
}
resumeSpan = spanAndTime{
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
finishesSpec: span.finishesSpec,
}
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,16 @@ func registerBackup(r registry.Registry) {
// total elapsed time. This is used by roachperf to compute and display
// the average MB/sec per node.
tick()
c.Run(ctx, c.Node(1), `./cockroach sql --url={pgurl:1} -e "
BACKUP bank.bank TO 'gs://`+backupTestingBucket+`/`+dest+`?AUTH=implicit'"`)
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
var jobID jobspb.JobID
uri := `gs://` + backupTestingBucket + `/` + dest + `?AUTH=implicit`
if err := conn.QueryRowContext(ctx, fmt.Sprintf("BACKUP bank.bank INTO '%s' WITH detached", uri)).Scan(&jobID); err != nil {
return err
}
if err := AssertReasonableFractionCompleted(ctx, t.L(), c, jobID, 2); err != nil {
return err
}
tick()

// Upload the perf artifacts to any one of the nodes so that the test
Expand Down
55 changes: 55 additions & 0 deletions pkg/cmd/roachtest/tests/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,58 @@ func getJobProgress(t test.Test, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jo
}
return ret
}

func AssertReasonableFractionCompleted(
ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int,
) error {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
fractionsRecorded := make([]float64, 0)

for {
select {
case <-ticker.C:
fractionCompleted, err := getFractionProgressed(ctx, l, c, jobID, nodeToQuery)
if err != nil {
return err
}
fractionsRecorded = append(fractionsRecorded, fractionCompleted)
if fractionCompleted == 1 {
count := len(fractionsRecorded)
if count > 5 && fractionsRecorded[count/2] < 0.2 && fractionsRecorded[count/2] > 0.8 {
return errors.Newf("the median fraction completed was %.2f, which is outside (0.2,0.8)", fractionsRecorded[count/2])
}
l.Printf("not enough 'fractionCompleted' recorded to assert progress looks sane")
return nil
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish")
}
}
}

func getFractionProgressed(
ctx context.Context, l *logger.Logger, c cluster.Cluster, jobID jobspb.JobID, nodeToQuery int,
) (float64, error) {
var status string
var fractionCompleted float64
conn := c.Conn(ctx, l, nodeToQuery)
defer conn.Close()
err := conn.QueryRowContext(ctx, `SELECT status, fraction_completed FROM [SHOW JOB $1]`, jobID).Scan(&status, &fractionCompleted)
if err != nil {
return 0, errors.Wrap(err, "getting the job status and fraction completed")
}
jobStatus := jobs.Status(status)
switch jobStatus {
case jobs.StatusSucceeded:
if fractionCompleted != 1 {
return 0, errors.Newf("job completed but fraction completed is %.2f", fractionCompleted)
}
return fractionCompleted, nil
case jobs.StatusRunning:
l.Printf("job %d still running, %.2f completed, waiting to succeed", jobID, fractionCompleted)
return fractionCompleted, nil
default:
return 0, errors.Newf("unexpectedly found job %s in state %s", jobID, status)
}
}

0 comments on commit 0c3f5d4

Please sign in to comment.