Skip to content

Commit

Permalink
Merge pull request #97210 from rhu713/backport22.2-93997-94805-96313-…
Browse files Browse the repository at this point in the history
…96529-96302-96245

release-22.2: move alloc heavy Files field from manifest to SST, use slim manifest in backup restore
  • Loading branch information
Rui Hu authored Feb 21, 2023
2 parents c390c0c + e12b1f1 commit b9fbeca
Show file tree
Hide file tree
Showing 34 changed files with 3,304 additions and 588 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
"restore_data_processor.go",
Expand Down Expand Up @@ -165,6 +166,7 @@ go_test(
"create_scheduled_backup_test.go",
"datadriven_test.go",
"full_cluster_backup_restore_test.go",
"generative_split_and_scatter_processor_test.go",
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
Expand Down Expand Up @@ -264,6 +266,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
55 changes: 44 additions & 11 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,30 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
for _, file := range backupManifest.Files {
if file.StartTime.IsEmpty() && !file.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, file.Span)
iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv)
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return roachpb.RowCount{}, err
}
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return roachpb.RowCount{}, err
} else if !ok {
break
}

f := it.Value()
if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, f.Span)
} else {
completedSpans = append(completedSpans, file.Span)
completedSpans = append(completedSpans, f.Span)
}
}

Expand All @@ -172,10 +188,6 @@ func backup(
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
}

kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)

backupSpecs, err := distBackupPlanSpecs(
ctx,
planCtx,
Expand Down Expand Up @@ -319,11 +331,26 @@ func backup(
}
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
// Write a `BACKUP_MANIFEST` file to support backups in mixed-version clusters
// with 22.2 nodes.
//
// TODO(adityamaru): We can stop writing `BACKUP_MANIFEST` in 23.2
// because a mixed-version cluster with 23.1 nodes will read the
// `BACKUP_METADATA` instead.
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.BackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}

// Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy
// fields elided from the `BACKUP_MANIFEST`.
if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) {
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
}

var tableStatistics []*stats.TableStatisticProto
for i := range backupManifest.Descriptors {
if tbl, _, _, _, _ := descpb.GetDescriptors(&backupManifest.Descriptors[i]); tbl != nil {
Expand All @@ -350,7 +377,6 @@ func backup(
Statistics: tableStatistics,
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
return roachpb.RowCount{}, err
}
Expand Down Expand Up @@ -895,12 +921,19 @@ func getBackupDetailAndManifest(
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

backupManifest, err := createBackupManifest(
ctx,
execCfg,
txn,
updatedDetails,
prevBackups)
prevBackups,
layerToIterFactory,
)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
Expand Down
130 changes: 70 additions & 60 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,6 +36,8 @@ import (
"github.com/stretchr/testify/require"
)

// TestMetadataSST has to be in backupccl_test in order to be sure that the
// BACKUP planhook is registered.
func TestMetadataSST(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -112,14 +114,8 @@ func checkMetadata(
}

checkManifest(t, m, bm)
// If there are descriptor changes, we only check those as they should have
// all changes as well as existing descriptors
if len(m.DescriptorChanges) > 0 {
checkDescriptorChanges(ctx, t, m, bm)
} else {
checkDescriptors(ctx, t, m, bm)
}

checkDescriptorChanges(ctx, t, m, bm)
checkDescriptors(ctx, t, m, bm)
checkSpans(ctx, t, m, bm)
// Don't check introduced spans on the first backup.
if m.StartTime != (hlc.Timestamp{}) {
Expand Down Expand Up @@ -147,16 +143,17 @@ func checkDescriptors(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaDescs []descpb.Descriptor
var desc descpb.Descriptor

it := bm.DescIter(ctx)
it := bm.NewDescIter(ctx)
defer it.Close()
for it.Next(&desc) {
metaDescs = append(metaDescs, desc)
}
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

if it.Err() != nil {
t.Fatal(it.Err())
metaDescs = append(metaDescs, *it.Value())
}

require.Equal(t, m.Descriptors, metaDescs)
Expand All @@ -166,15 +163,16 @@ func checkDescriptorChanges(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaRevs []backuppb.BackupManifest_DescriptorRevision
var rev backuppb.BackupManifest_DescriptorRevision
it := bm.DescriptorChangesIter(ctx)
it := bm.NewDescriptorChangesIter(ctx)
defer it.Close()

for it.Next(&rev) {
metaRevs = append(metaRevs, rev)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaRevs = append(metaRevs, *it.Value())
}

// Descriptor Changes are sorted by time in the manifest.
Expand All @@ -189,15 +187,22 @@ func checkFiles(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaFiles []backuppb.BackupManifest_File
var file backuppb.BackupManifest_File
it := bm.FileIter(ctx)
it, err := bm.NewFileIter(ctx)
if err != nil {
t.Fatal(err)
}
defer it.Close()

for it.Next(&file) {
metaFiles = append(metaFiles, file)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}

metaFiles = append(metaFiles, *it.Value())
}

require.Equal(t, m.Files, metaFiles)
Expand All @@ -207,15 +212,17 @@ func checkSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.SpanIter(ctx)
it := bm.NewSpanIter(ctx)
defer it.Close()

for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.Spans, metaSpans)
Expand All @@ -225,14 +232,16 @@ func checkIntroducedSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.IntroducedSpanIter(ctx)
it := bm.NewIntroducedSpanIter(ctx)
defer it.Close()
for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.IntroducedSpans, metaSpans)
Expand All @@ -242,15 +251,17 @@ func checkTenants(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaTenants []descpb.TenantInfoWithUsage
var tenant descpb.TenantInfoWithUsage
it := bm.TenantIter(ctx)
it := bm.NewTenantIter(ctx)
defer it.Close()

for it.Next(&tenant) {
metaTenants = append(metaTenants, tenant)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaTenants = append(metaTenants, it.Value())
}

require.Equal(t, m.Tenants, metaTenants)
Expand All @@ -268,18 +279,17 @@ func checkStats(
if err != nil {
t.Fatal(err)
}
if len(expectedStats) == 0 {
expectedStats = nil
}

var metaStats = make([]*stats.TableStatisticProto, 0)
var s *stats.TableStatisticProto
it := bm.StatsIter(ctx)
it := bm.NewStatsIter(ctx)
defer it.Close()

for it.Next(&s) {
metaStats = append(metaStats, s)
}
if it.Err() != nil {
t.Fatal(it.Err())
metaStats, err := bulk.CollectToSlice(it)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedStats, metaStats)
}

Expand Down
Loading

0 comments on commit b9fbeca

Please sign in to comment.