Skip to content

Commit

Permalink
backupccl: improve restore checkpointing with span frontier
Browse files Browse the repository at this point in the history
Fixes: #81116, #87843

Release note (performance improvement): Previously, whenever a user resumed a paused `RESTORE` job
the checkpointing mechanism would potentially not account for completed work. This change
allows completed spans to be skipped over when restoring.
  • Loading branch information
baoalvin1 committed Dec 19, 2022
1 parent 369c405 commit f96c954
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 48 deletions.
147 changes: 147 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,153 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) {
checkInProgressBackupRestore(t, checkFraction, checkFraction)
}

// TestRestoreCheckpointing checks that progress is being persisted to the job record
// when progress is made in the form of a span frontier.
func TestRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

var allowResponse chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true}
params.ServerArgs = testServerArgs
params.ServerArgs.Knobs = knobs

ctx := context.Background()
_, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE r1`)
// We create these tables to ensure there are enough spans to restore and that we have partial progress
// when stopping the job.
var numTables int
for char := 'a'; char <= 'g'; char++ {
tableName := "r1." + string(char)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
numTables += 1
}
sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`)

restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=r2`

backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "a")

var jobID jobspb.JobID
// The do function on a restore stops more progress from being persisted to the job record
// after some progress is made.
do := func(query string, check inProgressChecker) {
t.Logf("checking query %q", query)

var totalExpectedResponses int
if strings.Contains(query, "RESTORE") {
// We expect restore to process each file in the backup individually.
// SST files are written per-range in the backup. So we expect the
// restore to process #(ranges) that made up the original table.
totalExpectedResponses = numTables
} else {
t.Fatal("expected query to be either a backup or restore")
}
jobDone := make(chan error)
allowResponse = make(chan struct{}, totalExpectedResponses)

go func() {
_, err := conn.Exec(query)
jobDone <- err
}()

// Allow one of the total expected responses to proceed.
for i := 0; i < 1; i++ {
allowResponse <- struct{}{}
}

err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
backupTableID: backupTableID,
dir: dir,
name: "foo",
})
})

// Close the channel to allow all remaining responses to proceed. We do this
// even if the above retry.ForDuration failed, otherwise the test will hang
// forever.
close(allowResponse)

if err := <-jobDone; err != nil {
t.Fatalf("%q: %+v", query, err)
}

if err != nil {
t.Log(err)
}
}

checkFraction := func(ctx context.Context, ip inProgressState) error {
latestJobID, err := ip.latestJobID()
if err != nil {
return err
}
var fractionCompleted float32
if err := ip.QueryRow(
`SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
latestJobID,
).Scan(&fractionCompleted); err != nil {
return err
}
if fractionCompleted < 0.01 || fractionCompleted > 0.99 {
return errors.Errorf(
"expected progress to be in range [0.01, 0.99] but got %f",
fractionCompleted,
)
}
return nil
}

do(restoreQuery, checkFraction)

sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)
require.NotEmpty(t, jobProgress.GetRestore().CompletedSpans)
require.Equal(t, hlc.Timestamp{WallTime: 1}, jobProgress.GetRestore().CompletedSpans[0].Timestamp)
require.NotEqual(t, numTables, len(jobProgress.GetRestore().CompletedSpans))

sqlDB.Exec(t, `PAUSE JOB $1`, jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
sqlDB.Exec(t, `RESUME JOB $1`, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
// After a job is resumed we want to test if all the job progress has been persisted to the job record.
// Additionally, we also want to verify that each of the tables has the same row count as the tables backed up.
jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)
require.Equal(t, numTables, len(jobProgress.GetRestore().CompletedSpans))
for _, completedSpan := range jobProgress.GetRestore().CompletedSpans {
require.Equal(t, hlc.Timestamp{WallTime: 1}, completedSpan.Timestamp)
}
for char := 'a'; char <= 'g'; char++ {
tableName := "r2." + string(char)
var rowCount int
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT count(*) AS row_count FROM %s`, tableName)).Scan(&rowCount)
// We expect 2 since each table was initialized with 2 rows.
require.Equal(t, 2, rowCount)
}
}

func createAndWaitForJob(
t *testing.T,
db *sqlutils.SQLRunner,
Expand Down
28 changes: 1 addition & 27 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"runtime"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand Down Expand Up @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// disallowShadowingBelow is set to an empty hlc.Timestamp in release builds
// i.e. allow all shadowing without AddSSTable having to check for overlapping
// keys. This is because RESTORE is expected to ingest into an empty keyspace.
// If a restore job is resumed, the un-checkpointed spans that are re-ingested
// will shadow (equal key, value; different ts) the already ingested keys.
//
// NB: disallowShadowingBelow used to be unconditionally set to logical=1.
// This permissive value would allow shadowing in case the RESTORE has to
// retry ingestions but served to force evaluation of AddSSTable to check for
// overlapping keys. It was believed that even across resumptions of a restore
// job, `checkForKeyCollisions` would be inexpensive because of our frequent
// job checkpointing. Further investigation in
// https://github.com/cockroachdb/cockroach/issues/81116 revealed that our
// progress checkpointing could significantly lag behind the spans we have
// ingested, making a resumed restore spend a lot of time in
// `checkForKeyCollisions` leading to severely degraded performance. We have
// *never* seen a restore fail because of the invariant enforced by setting
// `disallowShadowingBelow` to a non-empty value, and so we feel comfortable
// disabling this check entirely. A future release will work on fixing our
// progress checkpointing so that we do not have a buildup of un-checkpointed
// work, at which point we can reassess reverting to logical=1.
disallowShadowingBelow := hlc.Timestamp{}
if !build.IsRelease() {
disallowShadowingBelow = hlc.Timestamp{Logical: 1}
}
disallowShadowingBelow := hlc.Timestamp{Logical: 1}

var err error
batcher, err = bulk.MakeSSTBatcher(ctx,
Expand Down
124 changes: 103 additions & 21 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -221,6 +223,52 @@ func makeBackupLocalityMap(
return backupLocalityMap, nil
}

// filterCompletedImportSpans takes imported spans and filters them based on completed spans.
// It returns a span frontier with completed spans forwarded to timestamp 1 and incomplete
// spans with empty timestamps and a toDoSpans slice that contains spans that have not yet
// been restored.
// Initially, a span frontier is constructed with all import spans with an empty timestamp.
// Then, each of the completed spans' timestamps are forwarded.
// Finally, we check the span frontier for completed spans and skip over these when adding
// to our toDoSpans.
func filterCompletedImportSpans(
completedSpans []jobspb.RestoreProgress_RestoreProgressFrontierEntry,
importSpans []execinfrapb.RestoreSpanEntry,
) (*spanUtils.Frontier, []execinfrapb.RestoreSpanEntry, error) {
frontierCreation := make([]roachpb.Span, 0, len(importSpans))
for _, frontierSpanEntry := range importSpans {
frontierCreation = append(frontierCreation, frontierSpanEntry.Span)
}
checkpointFrontier, err := spanUtils.MakeFrontier(frontierCreation...)
if err != nil {
return nil, nil, err
}
for _, completedSpanEntry := range completedSpans {
_, err = checkpointFrontier.Forward(completedSpanEntry.Entry, completedSpanEntry.Timestamp)
if err != nil {
return nil, nil, err
}
}

var toDoSpans []execinfrapb.RestoreSpanEntry
for _, importSpan := range importSpans {
skip := false
checkpointFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span,
ts hlc.Timestamp) (done spanUtils.OpResult) {
if ts.Equal(hlc.Timestamp{WallTime: 1}) {
skip = true
return spanUtils.ContinueMatch
}
skip = false
return spanUtils.StopMatch
})
if !skip {
toDoSpans = append(toDoSpans, importSpan)
}
}
return checkpointFrontier, toDoSpans, err
}

// restore imports a SQL table (or tables) from sets of non-overlapping sstable
// files.
func restore(
Expand Down Expand Up @@ -256,9 +304,10 @@ func restore(

mu := struct {
syncutil.Mutex
highWaterMark int
res roachpb.RowCount
requestsCompleted []bool
highWaterMark int
res roachpb.RowCount
requestsCompleted []bool
checkpointFrontier *spanUtils.Frontier
}{
highWaterMark: -1,
}
Expand All @@ -284,6 +333,13 @@ func restore(
importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests,
backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV()))

if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
mu.checkpointFrontier, importSpans, err = filterCompletedImportSpans(job.Progress().Details.(*jobspb.Progress_Restore).Restore.CompletedSpans, importSpans)
if err != nil {
return emptyRowCount, err
}
}

if len(importSpans) == 0 {
// There are no files to restore.
return emptyRowCount, nil
Expand Down Expand Up @@ -332,8 +388,20 @@ func restore(
switch d := details.(type) {
case *jobspb.Progress_Restore:
mu.Lock()
if mu.highWaterMark >= 0 {
d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key
if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
completedSpansSlice := make([]jobspb.RestoreProgress_RestoreProgressFrontierEntry, 0)
mu.checkpointFrontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) {
// We only append completed spans to store in the jobs record.
if !ts.IsEmpty() {
completedSpansSlice = append(completedSpansSlice, jobspb.RestoreProgress_RestoreProgressFrontierEntry{Entry: sp, Timestamp: ts})
}
return spanUtils.ContinueMatch
})
d.Restore.CompletedSpans = completedSpansSlice
} else {
if mu.highWaterMark >= 0 {
d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key
}
}
mu.Unlock()
default:
Expand All @@ -355,31 +423,45 @@ func restore(
// to progCh.
for progress := range progCh {
mu.Lock()
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}
if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx
mu.res.Add(progDetails.Summary)
_, err = mu.checkpointFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1})
if err != nil {
log.Errorf(ctx, "unable to forward timestamp: %+v", err)
}
} else {
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx

// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}
}
mu.Unlock()

// Signal that the processor has finished importing a span, to update job
// progress.
requestFinishedCh <- struct{}{}
}

return nil
}
tasks = append(tasks, jobCheckpointLoop)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func makeSimpleImportSpans(
backups []backuppb.BackupManifest,
backupLocalityMap map[int]storeByLocalityKV,
introducedSpanFrontier *spanUtils.Frontier,
// TODO(mb): Remove lowWaterMark argument in 23.1.
lowWaterMark roachpb.Key,
targetSize int64,
) []execinfrapb.RestoreSpanEntry {
Expand Down
Loading

0 comments on commit f96c954

Please sign in to comment.