Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: improve restore checkpointing with span frontier #92002

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,145 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) {
checkInProgressBackupRestore(t, checkFraction, checkFraction)
}

// TestRestoreCheckpointing checks that progress is being persisted to the job record
// when progress is made in the form of a span frontier.
func TestRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

var allowResponse chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
msbutler marked this conversation as resolved.
Show resolved Hide resolved
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true}
params.ServerArgs = testServerArgs
params.ServerArgs.Knobs = knobs

ctx := context.Background()
_, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE r1`)
// We create these tables to ensure there are enough spans to restore and that we have partial progress
// when stopping the job.
var numTables = 7
for i := 1; i <= numTables; i++ {
tableName := fmt.Sprintf("r1.table%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
}
sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`)

var jobID jobspb.JobID
// The do function on a restore stops more progress from being persisted to the job record
// after some progress is made.
do := func(query string, check inProgressChecker) {
t.Logf("checking query %q", query)

jobDone := make(chan error)
allowResponse = make(chan struct{}, numTables)

go func() {
_, err := conn.Exec(query)
jobDone <- err
}()

// Allow one of the total expected responses to proceed.
allowResponse <- struct{}{}

backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "table1")
err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this error is not nil, the test should fail. That's what happened in the stress race result. since the error is not handled, the test was able to proceed, leading weirdness downstream.

DB: conn,
backupTableID: backupTableID,
dir: dir,
name: "foo",
})
})

if err != nil {
close(allowResponse)
t.Fatalf("%q: %+v", query, err)
}

sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)
sqlDB.Exec(t, `PAUSE JOB $1`, jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
// Close the channel to allow all remaining responses to proceed. We do this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to pause the job before we do this? this allows the jobs to complete before you check the progress. This likely explains why stress is failing.

// even if the above retry.ForDuration failed, otherwise the test will hang
// forever.
close(allowResponse)

if err := <-jobDone; err != nil {
t.Fatalf("%q: %+v", query, err)
}

if err != nil {
t.Log(err)
}
}

checkFraction := func(ctx context.Context, ip inProgressState) error {
latestJobID, err := ip.latestJobID()
if err != nil {
return err
}
var fractionCompleted float32
if err := ip.QueryRow(
`SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`,
latestJobID,
).Scan(&fractionCompleted); err != nil {
return err
}
if fractionCompleted < 0.01 || fractionCompleted > 0.99 {
return errors.Errorf(
"expected progress to be in range [0.01, 0.99] but got %f",
fractionCompleted,
)
}
return nil
}

restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=r2`
do(restoreQuery, checkFraction)

sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check all this stuff before the job is paused?

jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)
require.NotEmpty(t, jobProgress.GetRestore().CompletedSpans)
require.Equal(t, hlc.Timestamp{WallTime: 1}, jobProgress.GetRestore().CompletedSpans[0].Timestamp)
require.NotEqual(t, numTables, len(jobProgress.GetRestore().CompletedSpans))

sqlDB.Exec(t, `RESUME JOB $1`, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
// After a job is resumed we want to test if all the job progress has been persisted to the job record.
// Additionally, we also want to verify that each of the tables has the same row count as the tables backed up.
jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID)
require.NotNil(t, jobProgress)
require.Equal(t, numTables, len(jobProgress.GetRestore().CompletedSpans))
for _, completedSpan := range jobProgress.GetRestore().CompletedSpans {
require.Equal(t, hlc.Timestamp{WallTime: 1}, completedSpan.Timestamp)
}
for i := 1; i <= 7; i++ {
tableName := fmt.Sprintf("r1.table%d", i)
var rowCount int
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT count(*) AS row_count FROM %s`, tableName)).Scan(&rowCount)
// We expect 2 since each table was initialized with 2 rows.
require.Equal(t, 2, rowCount)
}
}

func createAndWaitForJob(
t *testing.T,
db *sqlutils.SQLRunner,
Expand Down
28 changes: 1 addition & 27 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"runtime"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand Down Expand Up @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// disallowShadowingBelow is set to an empty hlc.Timestamp in release builds
// i.e. allow all shadowing without AddSSTable having to check for overlapping
// keys. This is because RESTORE is expected to ingest into an empty keyspace.
// If a restore job is resumed, the un-checkpointed spans that are re-ingested
// will shadow (equal key, value; different ts) the already ingested keys.
//
// NB: disallowShadowingBelow used to be unconditionally set to logical=1.
// This permissive value would allow shadowing in case the RESTORE has to
// retry ingestions but served to force evaluation of AddSSTable to check for
// overlapping keys. It was believed that even across resumptions of a restore
// job, `checkForKeyCollisions` would be inexpensive because of our frequent
// job checkpointing. Further investigation in
// https://github.com/cockroachdb/cockroach/issues/81116 revealed that our
// progress checkpointing could significantly lag behind the spans we have
// ingested, making a resumed restore spend a lot of time in
// `checkForKeyCollisions` leading to severely degraded performance. We have
// *never* seen a restore fail because of the invariant enforced by setting
// `disallowShadowingBelow` to a non-empty value, and so we feel comfortable
// disabling this check entirely. A future release will work on fixing our
// progress checkpointing so that we do not have a buildup of un-checkpointed
// work, at which point we can reassess reverting to logical=1.
disallowShadowingBelow := hlc.Timestamp{}
if !build.IsRelease() {
disallowShadowingBelow = hlc.Timestamp{Logical: 1}
}
disallowShadowingBelow := hlc.Timestamp{Logical: 1}

var err error
batcher, err = bulk.MakeSSTBatcher(ctx,
Expand Down
106 changes: 89 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -221,6 +223,49 @@ func makeBackupLocalityMap(
return backupLocalityMap, nil
}

// filterCompletedImportSpans constructs a spanFrontier which tracks ingestion progress on
// the key space we seek to restore and a slice of spans we still need to restore.
// It constructs these objects using the passed in importSpans, a set of key spans which represent
// the whole key space we're restoring, and the passed in completedSpans, which
// represents a set of key spans that have already been restored.
func filterCompletedImportSpans(
baoalvin1 marked this conversation as resolved.
Show resolved Hide resolved
completedSpans []jobspb.RestoreProgress_RestoreProgressFrontierEntry,
importSpans []execinfrapb.RestoreSpanEntry,
) (*spanUtils.Frontier, []execinfrapb.RestoreSpanEntry, error) {
frontierCreation := make([]roachpb.Span, 0, len(importSpans))
for _, frontierSpanEntry := range importSpans {
msbutler marked this conversation as resolved.
Show resolved Hide resolved
frontierCreation = append(frontierCreation, frontierSpanEntry.Span)
}
checkpointFrontier, err := spanUtils.MakeFrontier(frontierCreation...)
if err != nil {
return nil, nil, err
}
for _, completedSpanEntry := range completedSpans {
_, err = checkpointFrontier.Forward(completedSpanEntry.Entry, completedSpanEntry.Timestamp)
if err != nil {
return nil, nil, err
}
}

var toDoSpans []execinfrapb.RestoreSpanEntry
for _, importSpan := range importSpans {
skip := false
checkpointFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span,
ts hlc.Timestamp) (done spanUtils.OpResult) {
if ts.Equal(hlc.Timestamp{WallTime: 1}) {
skip = true
return spanUtils.ContinueMatch
}
skip = false
return spanUtils.StopMatch
})
if !skip {
toDoSpans = append(toDoSpans, importSpan)
}
}
return checkpointFrontier, toDoSpans, err
}

// restore imports a SQL table (or tables) from sets of non-overlapping sstable
// files.
func restore(
Expand Down Expand Up @@ -256,9 +301,10 @@ func restore(

mu := struct {
syncutil.Mutex
highWaterMark int
res roachpb.RowCount
requestsCompleted []bool
highWaterMark int
res roachpb.RowCount
requestsCompleted []bool
checkpointFrontier *spanUtils.Frontier
}{
highWaterMark: -1,
}
Expand All @@ -284,6 +330,13 @@ func restore(
importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests,
backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV()))

if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
mu.checkpointFrontier, importSpans, err = filterCompletedImportSpans(job.Progress().Details.(*jobspb.Progress_Restore).Restore.CompletedSpans, importSpans)
if err != nil {
return emptyRowCount, err
}
}

if len(importSpans) == 0 {
// There are no files to restore.
return emptyRowCount, nil
Expand Down Expand Up @@ -332,8 +385,20 @@ func restore(
switch d := details.(type) {
case *jobspb.Progress_Restore:
mu.Lock()
if mu.highWaterMark >= 0 {
d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key
msbutler marked this conversation as resolved.
Show resolved Hide resolved
if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
completedSpansSlice := make([]jobspb.RestoreProgress_RestoreProgressFrontierEntry, 0)
mu.checkpointFrontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) {
// We only append completed spans to store in the jobs record.
if !ts.IsEmpty() {
completedSpansSlice = append(completedSpansSlice, jobspb.RestoreProgress_RestoreProgressFrontierEntry{Entry: sp, Timestamp: ts})
}
return spanUtils.ContinueMatch
})
d.Restore.CompletedSpans = completedSpansSlice
} else {
if mu.highWaterMark >= 0 {
d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key
}
}
mu.Unlock()
default:
Expand All @@ -359,27 +424,34 @@ func restore(
if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err)
}

mu.res.Add(progDetails.Summary)
idx := progDetails.ProgressIdx
if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) {
_, err = mu.checkpointFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1})
if err != nil {
log.Errorf(ctx, "unable to forward timestamp: %+v", err)
}
} else {
idx := progDetails.ProgressIdx

// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
// Assert that we're actually marking the correct span done. See #23977.
if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) {
mu.Unlock()
return errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, importSpans[idx],
)
}
mu.requestsCompleted[idx] = true
for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ {
mu.highWaterMark = j
}
}
mu.Unlock()

// Signal that the processor has finished importing a span, to update job
// progress.
requestFinishedCh <- struct{}{}
}

return nil
}
tasks = append(tasks, jobCheckpointLoop)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func makeSimpleImportSpans(
backups []backuppb.BackupManifest,
backupLocalityMap map[int]storeByLocalityKV,
introducedSpanFrontier *spanUtils.Frontier,
// TODO(mb): Remove lowWaterMark argument in 23.1.
lowWaterMark roachpb.Key,
targetSize int64,
) []execinfrapb.RestoreSpanEntry {
Expand Down
Loading