Skip to content

Commit

Permalink
Revert "Merge pull request #97210 from rhu713/backport22.2-93997-9480…
Browse files Browse the repository at this point in the history
…5-96313-96529-96302-96245"

This reverts commit b9fbeca, reversing
changes made to c390c0c.
  • Loading branch information
Rui Hu committed Mar 21, 2023
1 parent ace446b commit c0bd21b
Show file tree
Hide file tree
Showing 34 changed files with 587 additions and 3,304 deletions.
3 changes: 0 additions & 3 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
"restore_data_processor.go",
Expand Down Expand Up @@ -169,7 +168,6 @@ go_test(
"create_scheduled_backup_test.go",
"datadriven_test.go",
"full_cluster_backup_restore_test.go",
"generative_split_and_scatter_processor_test.go",
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
Expand Down Expand Up @@ -269,7 +267,6 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
55 changes: 11 additions & 44 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,14 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv)
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return roachpb.RowCount{}, err
}
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return roachpb.RowCount{}, err
} else if !ok {
break
}

f := it.Value()
if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, f.Span)
for _, file := range backupManifest.Files {
if file.StartTime.IsEmpty() && !file.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, file.Span)
} else {
completedSpans = append(completedSpans, f.Span)
completedSpans = append(completedSpans, file.Span)
}
}

Expand All @@ -189,6 +173,10 @@ func backup(
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
}

kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)

backupSpecs, err := distBackupPlanSpecs(
ctx,
planCtx,
Expand Down Expand Up @@ -332,26 +320,11 @@ func backup(
}
}

// Write a `BACKUP_MANIFEST` file to support backups in mixed-version clusters
// with 22.2 nodes.
//
// TODO(adityamaru): We can stop writing `BACKUP_MANIFEST` in 23.2
// because a mixed-version cluster with 23.1 nodes will read the
// `BACKUP_METADATA` instead.
resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.BackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}

// Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy
// fields elided from the `BACKUP_MANIFEST`.
if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) {
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
}

var tableStatistics []*stats.TableStatisticProto
for i := range backupManifest.Descriptors {
if tbl, _, _, _, _ := descpb.GetDescriptors(&backupManifest.Descriptors[i]); tbl != nil {
Expand All @@ -378,6 +351,7 @@ func backup(
Statistics: tableStatistics,
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
return roachpb.RowCount{}, err
}
Expand Down Expand Up @@ -922,19 +896,12 @@ func getBackupDetailAndManifest(
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

backupManifest, err := createBackupManifest(
ctx,
execCfg,
txn,
updatedDetails,
prevBackups,
layerToIterFactory,
)
prevBackups)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
Expand Down
130 changes: 60 additions & 70 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,8 +36,6 @@ import (
"github.com/stretchr/testify/require"
)

// TestMetadataSST has to be in backupccl_test in order to be sure that the
// BACKUP planhook is registered.
func TestMetadataSST(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -114,8 +112,14 @@ func checkMetadata(
}

checkManifest(t, m, bm)
checkDescriptorChanges(ctx, t, m, bm)
checkDescriptors(ctx, t, m, bm)
// If there are descriptor changes, we only check those as they should have
// all changes as well as existing descriptors
if len(m.DescriptorChanges) > 0 {
checkDescriptorChanges(ctx, t, m, bm)
} else {
checkDescriptors(ctx, t, m, bm)
}

checkSpans(ctx, t, m, bm)
// Don't check introduced spans on the first backup.
if m.StartTime != (hlc.Timestamp{}) {
Expand Down Expand Up @@ -143,17 +147,16 @@ func checkDescriptors(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaDescs []descpb.Descriptor
var desc descpb.Descriptor

it := bm.NewDescIter(ctx)
it := bm.DescIter(ctx)
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
for it.Next(&desc) {
metaDescs = append(metaDescs, desc)
}

metaDescs = append(metaDescs, *it.Value())
if it.Err() != nil {
t.Fatal(it.Err())
}

require.Equal(t, m.Descriptors, metaDescs)
Expand All @@ -163,16 +166,15 @@ func checkDescriptorChanges(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaRevs []backuppb.BackupManifest_DescriptorRevision
it := bm.NewDescriptorChangesIter(ctx)
var rev backuppb.BackupManifest_DescriptorRevision
it := bm.DescriptorChangesIter(ctx)
defer it.Close()

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaRevs = append(metaRevs, *it.Value())
for it.Next(&rev) {
metaRevs = append(metaRevs, rev)
}
if it.Err() != nil {
t.Fatal(it.Err())
}

// Descriptor Changes are sorted by time in the manifest.
Expand All @@ -187,22 +189,15 @@ func checkFiles(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaFiles []backuppb.BackupManifest_File
it, err := bm.NewFileIter(ctx)
if err != nil {
t.Fatal(err)
}
var file backuppb.BackupManifest_File
it := bm.FileIter(ctx)
defer it.Close()

for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}

metaFiles = append(metaFiles, *it.Value())
for it.Next(&file) {
metaFiles = append(metaFiles, file)
}
if it.Err() != nil {
t.Fatal(it.Err())
}

require.Equal(t, m.Files, metaFiles)
Expand All @@ -212,17 +207,15 @@ func checkSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
it := bm.NewSpanIter(ctx)
var span roachpb.Span
it := bm.SpanIter(ctx)
defer it.Close()

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaSpans = append(metaSpans, it.Value())
for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())
}

require.Equal(t, m.Spans, metaSpans)
Expand All @@ -232,16 +225,14 @@ func checkIntroducedSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
it := bm.NewIntroducedSpanIter(ctx)
var span roachpb.Span
it := bm.IntroducedSpanIter(ctx)
defer it.Close()

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaSpans = append(metaSpans, it.Value())
for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())
}

require.Equal(t, m.IntroducedSpans, metaSpans)
Expand All @@ -251,17 +242,15 @@ func checkTenants(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaTenants []descpb.TenantInfoWithUsage
it := bm.NewTenantIter(ctx)
var tenant descpb.TenantInfoWithUsage
it := bm.TenantIter(ctx)
defer it.Close()

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaTenants = append(metaTenants, it.Value())
for it.Next(&tenant) {
metaTenants = append(metaTenants, tenant)
}
if it.Err() != nil {
t.Fatal(it.Err())
}

require.Equal(t, m.Tenants, metaTenants)
Expand All @@ -279,17 +268,18 @@ func checkStats(
if err != nil {
t.Fatal(err)
}
if len(expectedStats) == 0 {
expectedStats = nil
}

it := bm.NewStatsIter(ctx)
var metaStats = make([]*stats.TableStatisticProto, 0)
var s *stats.TableStatisticProto
it := bm.StatsIter(ctx)
defer it.Close()
metaStats, err := bulk.CollectToSlice(it)
if err != nil {
t.Fatal(err)
}

for it.Next(&s) {
metaStats = append(metaStats, s)
}
if it.Err() != nil {
t.Fatal(it.Err())
}
require.Equal(t, expectedStats, metaStats)
}

Expand Down
Loading

0 comments on commit c0bd21b

Please sign in to comment.