Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: checkpoint restore progress with a span frontier #97862

Merged
merged 1 commit into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"restore_job.go",
"restore_planning.go",
"restore_processor_planning.go",
"restore_progress.go",
"restore_schema_change_creation.go",
"restore_span_covering.go",
"schedule_exec.go",
Expand Down Expand Up @@ -182,6 +183,7 @@ go_test(
"restore_mid_schema_change_test.go",
"restore_old_sequences_test.go",
"restore_old_versions_test.go",
"restore_progress_test.go",
"restore_span_covering_test.go",
"schedule_pts_chaining_test.go",
"show_test.go",
Expand Down
142 changes: 137 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/cloud/gcp"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1488,6 +1489,125 @@ WHERE
do(`RESTORE data.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb')`, checkRestore)
}

// TestRestoreCheckpointing checks that progress persists to the job record
// using the new span frontier. The test takes the following approach:
//
// 1. Backup and restore a database with x tables, each of which will have a
// disjoint span. This implies the restore will process x requiredSpans. Since
// each required span has two rows, each required span will result in a single
// restoreSpanEntry, and consequently, a single AddSSTable flush. Because the
// checkpoint frontier merges disjoint required spans, the persisted frontier
// should only have 1 entry.
//
// 2. This test will then block and pause the restore after y AddSStable
// flushes, and assert that y disjoint spans have been persisted to the
// progress frontier.
//
// 3. The test will then resume the restore job, allowing it to complete.
// Afterwards, the test asserts that all x spans have been persisted to the
// frontier, and that only x-y AddSStable requests were sent after resume,
// implying that on resume, no work already persisted to the frontier was
// duplicated.
func TestRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.TestingSetProgressThresholds()()

// totalEntries represents the number of entries to appear in the persisted frontier.
totalEntries := 7
entriesBeforePause := 4
entriesCount := 0
var alreadyPaused atomic.Bool
postResumeCount := 0
blockDBRestore := make(chan struct{})
waitForProgress := make(chan struct{})
var mu syncutil.Mutex
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
// pre-pause blockDBRestore chan, several pre-pause requests will
// queue up behind it. Because of the lock, these requests will
// actually complete _after_ the job was paused and will not get
// counted in the checkpointed span. For test correctness, we do not
// want to count these requests as part of postResumedCount by
// checking if the job was paused in each request before it began
// waiting for the lock.
wasPausedBeforeWaiting := alreadyPaused.Load()
mu.Lock()
defer mu.Unlock()
if entriesCount == entriesBeforePause {
close(waitForProgress)
<-blockDBRestore
}
entriesCount++
if wasPausedBeforeWaiting {
postResumeCount++
}
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true}
params.ServerArgs = testServerArgs
params.ServerArgs.Knobs = knobs

_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1,
InitManualReplication, params)
defer cleanupFn()

var jobID jobspb.JobID
checkPersistedSpanLength := func(expectedCompletedSpans int) {
testutils.SucceedsSoon(t, func() error {
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
numSpans := len(jobProgress.GetRestore().Checkpoint)
if numSpans != expectedCompletedSpans {
return errors.Newf("expected %d checkpoints, but only see %d", expectedCompletedSpans, numSpans)
}
return nil
})
}

sqlDB.Exec(t, `CREATE DATABASE d`)

for i := 1; i <= totalEntries; i++ {
tableName := fmt.Sprintf("d.t%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
}
sqlDB.Exec(t, `BACKUP DATABASE d INTO $1`, localFoo)

sqlDB.QueryRow(t, `RESTORE DATABASE d FROM LATEST IN $1 WITH DETACHED, new_db_name=d2`, localFoo).Scan(&jobID)

// Pause the job after some progress has been logged.
<-waitForProgress

// To ensure that progress gets persisted, sleep well beyond the test only job update interval.
time.Sleep(time.Second)

sqlDB.Exec(t, `PAUSE JOB $1`, &jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
// NB: we don't check the persisted span length here, though we expect there
// to be 1 completed persisted span most of the time. Occasionally, two
// disjoint spans may be persisted if the required spans are processed out of
// order; i.e. table 5's span gets processed before table 4's.
require.Equal(t, entriesBeforePause, entriesCount)

close(blockDBRestore)
alreadyPaused.Store(true)
sqlDB.Exec(t, `RESUME JOB $1`, &jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)

// Ensure that no persisted work was repeated on resume and that all work was persisted.
checkPersistedSpanLength(1)
require.Equal(t, totalEntries-entriesBeforePause, postResumeCount)
}

func TestBackupRestoreSystemJobsProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68571, "flaky test")
Expand Down Expand Up @@ -1526,14 +1646,16 @@ func createAndWaitForJob(
descriptorIDs []descpb.ID,
details jobspb.Details,
progress jobspb.ProgressDetails,
clusterVersionAtJobStart roachpb.Version,
) {
t.Helper()
now := timeutil.ToUnixMicros(timeutil.Now())
payload, err := protoutil.Marshal(&jobspb.Payload{
UsernameProto: username.RootUserName().EncodeProto(),
DescriptorIDs: descriptorIDs,
StartedMicros: now,
Details: jobspb.WrapPayloadDetails(details),
CreationClusterVersion: clusterVersionAtJobStart,
UsernameProto: username.RootUserName().EncodeProto(),
DescriptorIDs: descriptorIDs,
StartedMicros: now,
Details: jobspb.WrapPayloadDetails(details),
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1622,6 +1744,7 @@ func TestBackupRestoreResume(t *testing.T) {
URI: "nodelocal://0/backup" + "-" + item.testName,
},
jobspb.BackupProgress{},
roachpb.Version{},
)

// If the backup properly took the (incorrect) checkpoint into account, it
Expand Down Expand Up @@ -1665,6 +1788,9 @@ func TestBackupRestoreResume(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tableStartKey, err := randgen.TestingMakePrimaryIndexKeyForTenant(backupTableDesc, codec)
require.NoError(t, err)

createAndWaitForJob(
t, sqlDB, []descpb.ID{restoreTableID},
jobspb.RestoreDetails{
Expand All @@ -1678,8 +1804,14 @@ func TestBackupRestoreResume(t *testing.T) {
URIs: []string{restoreDir},
},
jobspb.RestoreProgress{
HighWater: restoreHighWaterMark,
Checkpoint: []jobspb.RestoreProgress_FrontierEntry{
{
Span: roachpb.Span{Key: tableStartKey, EndKey: restoreHighWaterMark},
Timestamp: completedSpanTime},
},
},
// Required because restore checkpointing is version gated.
clusterversion.ByKey(clusterversion.V23_1Start),
)
// If the restore properly took the (incorrect) low-water mark into account,
// the first half of the table will be missing.
Expand Down
23 changes: 21 additions & 2 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -90,11 +91,29 @@ func BenchmarkRestoreEntryCover(b *testing.B) {

spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)

checkpointFrontier, err := loadCheckpointFrontier(backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{})
require.NoError(b, err)

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
nil,
introducedSpanFrontier,
0,
false)
require.NoError(b, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false)
return generateAndSendImportSpans(
ctx,
backups[numBackups-1].Spans,
backups,
layerToBackupManifestFileIterFactory,
nil,
filter,
false,
spanCh)
})

var cov []execinfrapb.RestoreSpanEntry
Expand Down
22 changes: 15 additions & 7 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,28 +281,36 @@ func runGenerativeSplitAndScatter(
if err != nil {
return err
}

introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime)
if err != nil {
return err
}

backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User())
if err != nil {
return err
}

checkpointFrontier, err := loadCheckpointFrontier(spec.Spans, spec.CheckpointedSpans)
if err != nil {
return err
}
filter, err := makeSpanCoveringFilter(
checkpointFrontier,
spec.HighWater,
introducedSpanFrontier,
spec.TargetSize,
spec.UseFrontierCheckpointing)
if err != nil {
return err
}
return generateAndSendImportSpans(
ctx,
spec.Spans,
backups,
layerToFileIterFactory,
backupLocalityMap,
introducedSpanFrontier,
spec.HighWater,
spec.TargetSize,
restoreSpanEntriesCh,
filter,
spec.UseSimpleImportSpans,
restoreSpanEntriesCh,
)
})

Expand Down
Loading