Skip to content

Commit

Permalink
backupccl: checkpoint restore progress with a span frontier
Browse files Browse the repository at this point in the history
Previously, after a node restart or pause event, a resumed restore would have to
redo a significant amount of work due to our naive progress checkpointing
procedure described in #87843.

This patch upgrades our checkpointing procedure to use a span frontier,
significantly reducing wasted work. Here's the basic idea:
1. Create a span frontier behind a lock on the coordinator.
2. When the coordinator receives a progress update about a span that was
ingested, forward that span's time stamp in the frontier to the  hardcoded
`completedSpanTime`.
3. The frontier is periodically persisted to the job record every 15 seconds
or after 5% progress. Note, only the first N spans are persisted, to bound the
amount of data written to the jobs table. The bound is set by the new
restore.frontier_checkpoint_max_bytes private setting.
4. If the job is resumed after some progress was persisted, the generative
split and scatter processor will skip the completed spans in the persisted
frontier. Note that if a part of requiredSpan was complete, only the subset
that is incomplete will get processed into a restoreSpanEntry.

As part of this work, many of the function signatures in the restore codebase
were refactored to prevent a large number input parameters. Further, much of
the restore checkpointing code was moved to the seperate restore_progress.go
file.

Fixes #87843

Release note: None
  • Loading branch information
msbutler committed Mar 13, 2023
1 parent e4924e2 commit 2ea46c8
Show file tree
Hide file tree
Showing 14 changed files with 1,058 additions and 338 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"restore_job.go",
"restore_planning.go",
"restore_processor_planning.go",
"restore_progress.go",
"restore_schema_change_creation.go",
"restore_span_covering.go",
"schedule_exec.go",
Expand Down Expand Up @@ -182,6 +183,7 @@ go_test(
"restore_mid_schema_change_test.go",
"restore_old_sequences_test.go",
"restore_old_versions_test.go",
"restore_progress_test.go",
"restore_span_covering_test.go",
"schedule_pts_chaining_test.go",
"show_test.go",
Expand Down
142 changes: 137 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/cloud/gcp"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1488,6 +1489,125 @@ WHERE
do(`RESTORE data.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb')`, checkRestore)
}

// TestRestoreCheckpointing checks that progress persists to the job record
// using the new span frontier. The test takes the following approach:
//
// 1. Backup and restore a database with x tables, each of which will have a
// disjoint span. This implies the restore will process x requiredSpans. Since
// each required span has two rows, each required span will result in a single
// restoreSpanEntry, and consequently, a single AddSSTable flush. Because the
// checkpoint frontier merges disjoint required spans, the persisted frontier
// should only have 1 entry.
//
// 2. This test will then block and pause the restore after y AddSStable
// flushes, and assert that y disjoint spans have been persisted to the
// progress frontier.
//
// 3. The test will then resume the restore job, allowing it to complete.
// Afterwards, the test asserts that all x spans have been persisted to the
// frontier, and that only x-y AddSStable requests were sent after resume,
// implying that on resume, no work already persisted to the frontier was
// duplicated.
func TestRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

// totalEntries represents the number of entries to appear in the persisted frontier.
totalEntries := 7
entriesBeforePause := 4
entriesCount := 0
var alreadyPaused atomic.Bool
postResumeCount := 0
blockDBRestore := make(chan struct{})
waitForProgress := make(chan struct{})
var mu syncutil.Mutex
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
// pre-pause blockDBRestore chan, several pre-pause requests will
// queue up behind it. Because of the lock, these requests will
// actually complete _after_ the job was paused and will not get
// counted in the checkpointed span. For test correctness, we do not
// want to count these requests as part of postResumedCount by
// checking if the job was paused in each request before it began
// waiting for the lock.
wasPausedBeforeWaiting := alreadyPaused.Load()
mu.Lock()
defer mu.Unlock()
if entriesCount == entriesBeforePause {
close(waitForProgress)
<-blockDBRestore
}
entriesCount++
if wasPausedBeforeWaiting {
postResumeCount++
}
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true}
params.ServerArgs = testServerArgs
params.ServerArgs.Knobs = knobs

_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1,
InitManualReplication, params)
defer cleanupFn()

var jobID jobspb.JobID
checkPersistedSpanLength := func(expectedCompletedSpans int) {
testutils.SucceedsSoon(t, func() error {
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
numSpans := len(jobProgress.GetRestore().Checkpoint)
if numSpans != expectedCompletedSpans {
return errors.Newf("expected %d checkpoints, but only see %d", expectedCompletedSpans, numSpans)
}
return nil
})
}

sqlDB.Exec(t, `CREATE DATABASE d`)

for i := 1; i <= totalEntries; i++ {
tableName := fmt.Sprintf("d.t%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
}
sqlDB.Exec(t, `BACKUP DATABASE d INTO $1`, localFoo)

sqlDB.QueryRow(t, `RESTORE DATABASE d FROM LATEST IN $1 WITH DETACHED, new_db_name=d2`, localFoo).Scan(&jobID)

// Pause the job after some progress has been logged.
<-waitForProgress

// To ensure that progress gets persisted, sleep well beyond the test only job update interval.
time.Sleep(time.Second)

sqlDB.Exec(t, `PAUSE JOB $1`, &jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
// NB: we don't check the persisted span length here, though we expect there
// to be 1 completed persisted span most of the time. Occasionally, two
// disjoint spans may be persisted if the required spans are processed out of
// order; i.e. table 5's span gets processed before table 4's.
require.Equal(t, entriesBeforePause, entriesCount)

close(blockDBRestore)
alreadyPaused.Store(true)
sqlDB.Exec(t, `RESUME JOB $1`, &jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)

// Ensure that no persisted work was repeated on resume and that all work was persisted.
checkPersistedSpanLength(1)
require.Equal(t, totalEntries-entriesBeforePause, postResumeCount)
}

func TestBackupRestoreSystemJobsProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68571, "flaky test")
Expand Down Expand Up @@ -1526,14 +1646,16 @@ func createAndWaitForJob(
descriptorIDs []descpb.ID,
details jobspb.Details,
progress jobspb.ProgressDetails,
clusterVersionAtJobStart roachpb.Version,
) {
t.Helper()
now := timeutil.ToUnixMicros(timeutil.Now())
payload, err := protoutil.Marshal(&jobspb.Payload{
UsernameProto: username.RootUserName().EncodeProto(),
DescriptorIDs: descriptorIDs,
StartedMicros: now,
Details: jobspb.WrapPayloadDetails(details),
CreationClusterVersion: clusterVersionAtJobStart,
UsernameProto: username.RootUserName().EncodeProto(),
DescriptorIDs: descriptorIDs,
StartedMicros: now,
Details: jobspb.WrapPayloadDetails(details),
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1622,6 +1744,7 @@ func TestBackupRestoreResume(t *testing.T) {
URI: "nodelocal://0/backup" + "-" + item.testName,
},
jobspb.BackupProgress{},
roachpb.Version{},
)

// If the backup properly took the (incorrect) checkpoint into account, it
Expand Down Expand Up @@ -1665,6 +1788,9 @@ func TestBackupRestoreResume(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tableStartKey, err := randgen.TestingMakePrimaryIndexKeyForTenant(backupTableDesc, codec)
require.NoError(t, err)

createAndWaitForJob(
t, sqlDB, []descpb.ID{restoreTableID},
jobspb.RestoreDetails{
Expand All @@ -1678,8 +1804,14 @@ func TestBackupRestoreResume(t *testing.T) {
URIs: []string{restoreDir},
},
jobspb.RestoreProgress{
HighWater: restoreHighWaterMark,
Checkpoint: []jobspb.RestoreProgress_FrontierEntry{
{
Span: roachpb.Span{Key: tableStartKey, EndKey: restoreHighWaterMark},
Timestamp: completedSpanTime},
},
},
// Required because restore checkpointing is version gated.
clusterversion.ByKey(clusterversion.V23_1Start),
)
// If the restore properly took the (incorrect) low-water mark into account,
// the first half of the table will be missing.
Expand Down
23 changes: 21 additions & 2 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -90,11 +91,29 @@ func BenchmarkRestoreEntryCover(b *testing.B) {

spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)

checkpointFrontier, err := loadCheckpointFrontier(backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{})
require.NoError(b, err)

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
nil,
introducedSpanFrontier,
0,
false)
require.NoError(b, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false)
return generateAndSendImportSpans(
ctx,
backups[numBackups-1].Spans,
backups,
layerToBackupManifestFileIterFactory,
nil,
filter,
false,
spanCh)
})

var cov []execinfrapb.RestoreSpanEntry
Expand Down
22 changes: 15 additions & 7 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,28 +281,36 @@ func runGenerativeSplitAndScatter(
if err != nil {
return err
}

introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime)
if err != nil {
return err
}

backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User())
if err != nil {
return err
}

checkpointFrontier, err := loadCheckpointFrontier(spec.Spans, spec.CheckpointedSpans)
if err != nil {
return err
}
filter, err := makeSpanCoveringFilter(
checkpointFrontier,
spec.HighWater,
introducedSpanFrontier,
spec.TargetSize,
spec.UseFrontierCheckpointing)
if err != nil {
return err
}
return generateAndSendImportSpans(
ctx,
spec.Spans,
backups,
layerToFileIterFactory,
backupLocalityMap,
introducedSpanFrontier,
spec.HighWater,
spec.TargetSize,
restoreSpanEntriesCh,
filter,
spec.UseSimpleImportSpans,
restoreSpanEntriesCh,
)
})

Expand Down
Loading

0 comments on commit 2ea46c8

Please sign in to comment.