Skip to content

Commit

Permalink
backupccl: improve restore checkpointing with span frontier
Browse files Browse the repository at this point in the history
Fixes: #81116, #87843

Release note (performance improvement): Previously, whenever a user resumed a paused `RESTORE` job
the checkpointing mechanism would potentially not account for completed work. This change
allows completed spans to be skipped over when restoring.
  • Loading branch information
baoalvin1 committed Dec 7, 2022
1 parent 6c6fc75 commit 44bf16b
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ go_test(
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
"restore_checkpointing_test.go",
"restore_data_processor_test.go",
"restore_mid_schema_change_test.go",
"restore_old_sequences_test.go",
Expand Down
175 changes: 175 additions & 0 deletions pkg/ccl/backupccl/restore_checkpointing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"context"
gosql "database/sql"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestRestoreCheckpointing checks that all completed spans are
// skipped over when creating the slice for makeSimpleImportSpans.
func TestRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

var allowResponse chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true}
params.ServerArgs = testServerArgs
params.ServerArgs.Knobs = knobs

ctx := context.Background()
_, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE r1`)
sqlDB.Exec(t, `CREATE TABLE r1.foo (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.baz (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.a (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.b (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.c (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.d (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `CREATE TABLE r1.e (id INT PRIMARY KEY, s STRING)`)
sqlDB.Exec(t, `INSERT INTO r1.foo VALUES (1, 'x'),(2,'y')`)
sqlDB.Exec(t, `INSERT INTO r1.baz VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `INSERT INTO r1.a VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `INSERT INTO r1.b VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `INSERT INTO r1.c VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `INSERT INTO r1.d VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `INSERT INTO r1.e VALUES (11, 'xx'),(22,'yy')`)
sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`)

restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=data2`

backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "foo")

var jobID jobspb.JobID
do := func(query string, check inProgressChecker) {
t.Logf("checking query %q", query)

var totalExpectedResponses int
if strings.Contains(query, "RESTORE") {
// We expect restore to process each file in the backup individually.
// SST files are written per-range in the backup. So we expect the
// restore to process #(ranges) that made up the original table.
totalExpectedResponses = 5
} else {
t.Fatal("expected query to be either a backup or restore")
}
jobDone := make(chan error)
allowResponse = make(chan struct{}, totalExpectedResponses)

go func() {
_, err := conn.Exec(query)
jobDone <- err
}()

// Allow half the total expected responses to proceed.
for i := 0; i < totalExpectedResponses/2; i++ {
allowResponse <- struct{}{}
}

err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
backupTableID: backupTableID,
dir: dir,
name: "foo",
})
})

// Close the channel to allow all remaining responses to proceed. We do this
// even if the above retry.ForDuration failed, otherwise the test will hang
// forever.
close(allowResponse)

if err := <-jobDone; err != nil {
t.Fatalf("%q: %+v", query, err)
}

if err != nil {
t.Log(err)
}
}

progressQuery := `select crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) as progress from system.jobs where id=$1`

var progressMessage string
//var unused interface{}

checkFraction := func(ctx context.Context, ip inProgressState) error {
jobID, err := ip.latestJobID()
if err != nil {
return err
}
var fractionCompleted float32
if err := ip.QueryRow(
`SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
jobID,
).Scan(&fractionCompleted); err != nil {
return err
}
ip.QueryRow(progressQuery, jobID).Scan(&progressMessage)
t.Logf(progressMessage)
if fractionCompleted < 0.01 || fractionCompleted > 0.99 {
return errors.Errorf(
"expected progress to be in range [0.01, 0.99] but got %f",
fractionCompleted,
)
}
return nil
}

do(restoreQuery, checkFraction)

sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)
sqlDB.QueryRow(t, progressQuery, jobID).Scan(&progressMessage)
require.NotNil(t, progressMessage)
require.Contains(t, progressMessage, "\"wallTime\": \"1\"")
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)

sqlDB.Exec(t, `PAUSE JOB $1`, jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
sqlDB.Exec(t, `RESUME JOB $1`, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)
require.Equal(t, 7, len(jobProgress.GetRestore().Frontier))
}
28 changes: 1 addition & 27 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"runtime"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand Down Expand Up @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// disallowShadowingBelow is set to an empty hlc.Timestamp in release builds
// i.e. allow all shadowing without AddSSTable having to check for overlapping
// keys. This is because RESTORE is expected to ingest into an empty keyspace.
// If a restore job is resumed, the un-checkpointed spans that are re-ingested
// will shadow (equal key, value; different ts) the already ingested keys.
//
// NB: disallowShadowingBelow used to be unconditionally set to logical=1.
// This permissive value would allow shadowing in case the RESTORE has to
// retry ingestions but served to force evaluation of AddSSTable to check for
// overlapping keys. It was believed that even across resumptions of a restore
// job, `checkForKeyCollisions` would be inexpensive because of our frequent
// job checkpointing. Further investigation in
// https://github.com/cockroachdb/cockroach/issues/81116 revealed that our
// progress checkpointing could significantly lag behind the spans we have
// ingested, making a resumed restore spend a lot of time in
// `checkForKeyCollisions` leading to severely degraded performance. We have
// *never* seen a restore fail because of the invariant enforced by setting
// `disallowShadowingBelow` to a non-empty value, and so we feel comfortable
// disabling this check entirely. A future release will work on fixing our
// progress checkpointing so that we do not have a buildup of un-checkpointed
// work, at which point we can reassess reverting to logical=1.
disallowShadowingBelow := hlc.Timestamp{}
if !build.IsRelease() {
disallowShadowingBelow = hlc.Timestamp{Logical: 1}
}
disallowShadowingBelow := hlc.Timestamp{Logical: 1}

var err error
batcher, err = bulk.MakeSSTBatcher(ctx,
Expand Down
123 changes: 105 additions & 18 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -259,6 +261,7 @@ func restore(
highWaterMark int
res roachpb.RowCount
requestsCompleted []bool
spanFrontierSlice []jobspb.RestoreProgress_RestoreProgressFrontierEntry
}{
highWaterMark: -1,
}
Expand All @@ -281,8 +284,66 @@ func restore(
// which are grouped by keyrange.
highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater

importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests,
backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV()))
var importSpans []execinfrapb.RestoreSpanEntry
var checkpointingSpanFrontier *spanUtils.Frontier
if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) {
spanFrontierSlice := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Frontier
var frontierCreationSlice []roachpb.Span
for _, frontierSpanEntry := range spanFrontierSlice {
frontierCreationSlice = append(frontierCreationSlice, frontierSpanEntry.Entry)
}
checkpointingSpanFrontier, err = spanUtils.MakeFrontier(frontierCreationSlice...)
for _, frontierSpanEntry := range spanFrontierSlice {
_, err = checkpointingSpanFrontier.Forward(frontierSpanEntry.Entry, frontierSpanEntry.Timestamp)
if err != nil {
return emptyRowCount, err
}
}
importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests,
backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV()))
var modifiedImportSpans []execinfrapb.RestoreSpanEntry
for _, importSpan := range importSpans {
skip := false
checkpointingSpanFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span,
ts hlc.Timestamp) (done spanUtils.OpResult) {
if ts.Equal(hlc.Timestamp{WallTime: 1}) {
skip = true
return spanUtils.ContinueMatch
}
skip = false
return spanUtils.StopMatch
})
if skip {
continue
}
modifiedImportSpans = append(modifiedImportSpans, importSpan)
}
importSpans = modifiedImportSpans
if len(spanFrontierSlice) == 0 {
// construct span frontier
var checkpointingFrontierImportSpans = make([]roachpb.Span, len(importSpans))
for i, span := range importSpans {
checkpointingFrontierImportSpans[i] = span.Span
}
checkpointingSpanFrontier, err = spanUtils.MakeFrontier(checkpointingFrontierImportSpans...)
if err != nil {
return emptyRowCount, err
}
} else {
var importSpansSlice = make([]roachpb.Span, len(importSpans))
for i, importSpanEntry := range importSpans {
importSpansSlice[i] = importSpanEntry.Span
}
err = checkpointingSpanFrontier.AddSpansAt(hlc.Timestamp{}, importSpansSlice...)
if err != nil {
return emptyRowCount, err
}
mu.spanFrontierSlice = spanFrontierSlice
}
} else {
importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests,
backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV()))
}

if len(importSpans) == 0 {
// There are no files to restore.
Expand Down Expand Up @@ -335,6 +396,9 @@ func restore(
if mu.highWaterMark >= 0 {
d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key
}
if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) {
d.Restore.Frontier = mu.spanFrontierSlice
}
mu.Unlock()
default:
log.Errorf(progressedCtx, "job payload had unexpected type %T", d)
Expand All @@ -355,31 +419,54 @@ func restore(
// to progCh.
for progress := range progCh {
mu.Lock()
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}
if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) {
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx
mu.res.Add(progDetails.Summary)
_, err = checkpointingSpanFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1})
if err != nil {
log.Errorf(ctx, "unable to forward timestamp: %+v", err)
}

// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
mu.spanFrontierSlice = append(mu.spanFrontierSlice,
jobspb.RestoreProgress_RestoreProgressFrontierEntry{
Entry: progDetails.DataSpan,
Timestamp: hlc.Timestamp{WallTime: 1},
})
} else {
var progDetails backuppb.RestoreProgress
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx

// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}
}
mu.Unlock()

// Signal that the processor has finished importing a span, to update job
// progress.
requestFinishedCh <- struct{}{}
}
//if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1TenantNames) {
//
//}

return nil
}
tasks = append(tasks, jobCheckpointLoop)
Expand Down
Loading

0 comments on commit 44bf16b

Please sign in to comment.