Skip to content

Commit

Permalink
backupccl: elide spans from backups that were subsequently reintroduced
Browse files Browse the repository at this point in the history
Currently RESTORE may restore invalid backup data from a backed up table that
underwent an IMPORT rollback. See cockroachdb#87305 for a detailed explanation.

This patch ensures that RESTORE elides older backup data that were deleted via
a non-MVCC operation. Because incremental backups always reintroduce spans
(i.e. backs them up from timestamp 0) that may have undergone a non-mvcc
operation, restore can identify restoring spans with potentially corrupt data
in the backup chain and only ingest the spans' reintroduced data to any system
time, without the corrupt data.

Here's the basic impliemenation in Restore:
- For each index we want to restore
   - identify the last time, l, the index was re-introduced, using the manifests
   - dont restore the index using a backup if backup.EndTime < l

This implementation rests on the following assumption: the input spans for each
restoration flow (created in createImportingDescriptors) and the
restoreSpanEntries (created by makeSimpleImportSpans) do not span across
multiple sql indices. Given this assumption, makeSimpleImportSpans skips adding
files from a backups for a given input span that was reintroduced in a
subsequent backup.

Fixes cockroachdb#87305

Release justification: bug fix

Release note: none
  • Loading branch information
msbutler committed Sep 8, 2022
1 parent e39111b commit 4cf6cb1
Show file tree
Hide file tree
Showing 8 changed files with 495 additions and 23 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func makeTableSpan(tableID uint32) roachpb.Span {
return roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
}

func makeIndexSpan(tableID uint32, indexID uint32) roachpb.Span {
k := keys.SystemSQLCodec.IndexPrefix(tableID, indexID)
return roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
}

func TestBackupRestoreStatementResult(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
fmt "fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func BenchmarkCoverageChecks(b *testing.B) {
Expand Down Expand Up @@ -53,12 +56,20 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) {
ctx := context.Background()
backups := MockBackupChain(numBackups, numSpans, baseFiles, r)
latestIntrosByIndex, err := findLatestIntroFromManifests(backups, keys.SystemSQLCodec, hlc.Timestamp{})
require.NoError(b, err)
latestIntrosBySpan, err := findLatestIntroBySpan(backups[numBackups-1].Spans, keys.SystemSQLCodec, latestIntrosByIndex)
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := checkCoverage(ctx, backups[numBackups-1].Spans, backups); err != nil {
b.Fatal(err)
}
cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil, 0)
restoreData := restorationDataBase{
spans: backups[numBackups-1].Spans,
latestIntros: latestIntrosBySpan,
}
cov := makeSimpleImportSpans(&restoreData, backups, nil, nil, 0)
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
Expand Down
34 changes: 34 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// restorationData specifies the data that is to be restored in a restoration flow.
Expand All @@ -25,6 +27,10 @@ type restorationData interface {
// getSpans returns the data spans that we're restoring into this cluster.
getSpans() []roachpb.Span

// getLastIntros returns the end time of the last backup that reintroduced
// span i.
getLatestIntros() []hlc.Timestamp

// getSystemTables returns nil for non-cluster restores. It returns the
// descriptors of the temporary system tables that should be restored into the
// real table descriptors. The data for these temporary tables should be
Expand Down Expand Up @@ -66,6 +72,10 @@ func (*mainRestorationData) isMainBundle() bool { return true }
type restorationDataBase struct {
// spans is the spans included in this bundle.
spans []roachpb.Span

// latestIntros is the last time each span was introduced.
latestIntros []hlc.Timestamp

// rekeys maps old table IDs to their new table descriptor.
tableRekeys []execinfrapb.TableRekey
// tenantRekeys maps tenants being restored to their new ID.
Expand Down Expand Up @@ -105,6 +115,11 @@ func (b *restorationDataBase) getSpans() []roachpb.Span {
return b.spans
}

// getLastReIntros implements restorationData.
func (b *restorationDataBase) getLatestIntros() []hlc.Timestamp {
return b.latestIntros
}

// getSystemTables implements restorationData.
func (b *restorationDataBase) getSystemTables() []catalog.TableDescriptor {
return b.systemTables
Expand All @@ -114,6 +129,7 @@ func (b *restorationDataBase) getSystemTables() []catalog.TableDescriptor {
func (b *restorationDataBase) addTenant(fromTenantID, toTenantID roachpb.TenantID) {
prefix := keys.MakeTenantPrefix(fromTenantID)
b.spans = append(b.spans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()})
b.latestIntros = append(b.latestIntros, hlc.Timestamp{})
b.tenantRekeys = append(b.tenantRekeys, execinfrapb.TenantRekey{
OldID: fromTenantID,
NewID: toTenantID,
Expand Down Expand Up @@ -148,3 +164,21 @@ func checkForMigratedData(details jobspb.RestoreDetails, dataToRestore restorati

return false
}

// findLatestIntroBySpan finds the latest intro time for the inputted spans.
// This function assumes that each span's start and end key belong to the same
// index.
func findLatestIntroBySpan(
spans roachpb.Spans, codec keys.SQLCodec, latestIntros map[tableAndIndex]hlc.Timestamp,
) ([]hlc.Timestamp, error) {
latestIntrosBySpan := make([]hlc.Timestamp, len(spans))
for i, sp := range spans {
_, tablePrefix, indexPrefix, err := codec.DecodeIndexPrefix(sp.Key)
if err != nil {
return nil, err
}
introKey := tableAndIndex{descpb.ID(tablePrefix), descpb.IndexID(indexPrefix)}
latestIntrosBySpan[i] = latestIntros[introKey]
}
return latestIntrosBySpan, nil
}
53 changes: 48 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,8 @@ func restore(
// which are grouped by keyrange.
highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater

importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap,
highWaterMark, targetRestoreSpanSize)

importSpans := makeSimpleImportSpans(dataToRestore, backupManifests,
backupLocalityMap, highWaterMark, targetRestoreSpanSize)
if len(importSpans) == 0 {
// There are no files to restore.
return emptyRowCount, nil
Expand Down Expand Up @@ -651,6 +650,28 @@ func spansForAllRestoreTableIndexes(
return spans
}

// findLatestIntroFromManifests finds the endtime of the latest incremental
// backup that introduced each backed up index, as of restore time.
func findLatestIntroFromManifests(
manifests []backuppb.BackupManifest, codec keys.SQLCodec, asOf hlc.Timestamp,
) (map[tableAndIndex]hlc.Timestamp, error) {
latestIntro := make(map[tableAndIndex]hlc.Timestamp)
for _, b := range manifests {
if !asOf.IsEmpty() && asOf.Less(b.StartTime) {
break
}
for _, sp := range b.IntroducedSpans {
_, tablePrefix, indexPrefix, err := codec.DecodeIndexPrefix(sp.Key)
if err != nil {
return nil, err
}
introKey := tableAndIndex{descpb.ID(tablePrefix), descpb.IndexID(indexPrefix)}
latestIntro[introKey] = b.EndTime
}
}
return latestIntro, nil
}

func shouldPreRestore(table *tabledesc.Mutable) bool {
if table.GetParentID() != keys.SystemDatabaseID {
return false
Expand Down Expand Up @@ -695,6 +716,7 @@ func createImportingDescriptors(
p sql.JobExecContext,
backupCodec keys.SQLCodec,
sqlDescs []catalog.Descriptor,
latestIntrosByIndex map[tableAndIndex]hlc.Timestamp,
r *restoreResumer,
) (
dataToPreRestore *restorationDataBase,
Expand Down Expand Up @@ -1240,11 +1262,21 @@ func createImportingDescriptors(
pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.GetPrimaryIndexID()))] = true
}

preRestoreLatestIntros, err := findLatestIntroBySpan(preRestoreSpans, backupCodec, latestIntrosByIndex)
if err != nil {
return nil, nil, nil, err
}
dataToPreRestore = &restorationDataBase{
spans: preRestoreSpans,
tableRekeys: rekeys,
tenantRekeys: tenantRekeys,
pkIDs: pkIDs,
latestIntros: preRestoreLatestIntros,
}

postRestoreLatestIntros, err := findLatestIntroBySpan(postRestoreSpans, backupCodec, latestIntrosByIndex)
if err != nil {
return nil, nil, nil, err
}

trackedRestore = &mainRestorationData{
Expand All @@ -1253,6 +1285,7 @@ func createImportingDescriptors(
tableRekeys: rekeys,
tenantRekeys: tenantRekeys,
pkIDs: pkIDs,
latestIntros: postRestoreLatestIntros,
},
}

Expand All @@ -1264,6 +1297,12 @@ func createImportingDescriptors(
if details.VerifyData {
trackedRestore.restorationDataBase.spans = verifySpans
trackedRestore.restorationDataBase.validateOnly = true
verifySpansLatestIntros, err := findLatestIntroBySpan(verifySpans, backupCodec,
latestIntrosByIndex)
if err != nil {
return nil, nil, nil, err
}
trackedRestore.latestIntros = verifySpansLatestIntros

// Before the main (validation) flow, during a cluster level restore,
// we still need to restore system tables that do NOT get restored in the dataToPreRestore
Expand All @@ -1275,6 +1314,7 @@ func createImportingDescriptors(
preValidation.spans = postRestoreSpans
preValidation.tableRekeys = rekeys
preValidation.pkIDs = pkIDs
preValidation.latestIntros = postRestoreLatestIntros
}

if tempSystemDBID != descpb.InvalidID {
Expand Down Expand Up @@ -1432,9 +1472,12 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
if err != nil {
return err
}

latestIntrosByIndex, err := findLatestIntroFromManifests(backupManifests, backupCodec, details.EndTime)
if err != nil {
return err
}
preData, preValidateData, mainData, err := createImportingDescriptors(ctx, p, backupCodec,
sqlDescs, r)
sqlDescs, latestIntrosByIndex, r)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const targetRestoreSpanSize = 384 << 20
// if its current data size plus that of the new span is less than the target
// size.
func makeSimpleImportSpans(
requiredSpans []roachpb.Span,
data restorationData,
backups []backuppb.BackupManifest,
backupLocalityMap map[int]storeByLocalityKV,
lowWaterMark roachpb.Key,
Expand All @@ -88,9 +88,10 @@ func makeSimpleImportSpans(
for i := range backups {
sort.Sort(backupinfo.BackupFileDescriptors(backups[i].Files))
}

var cover []execinfrapb.RestoreSpanEntry
for _, span := range requiredSpans {
spans := data.getSpans()
latestIntros := data.getLatestIntros()
for spanIdx, span := range spans {
if span.EndKey.Compare(lowWaterMark) < 0 {
continue
}
Expand All @@ -99,8 +100,14 @@ func makeSimpleImportSpans(
}

spanCoverStart := len(cover)

for layer := range backups {
if backups[layer].EndTime.Less(latestIntros[spanIdx]) {
// Don't use this backup to cover this span if the span was reintroduced
// after the backup's endTime. In this case, this backup may
// have invalid data, and further, a subsequent backup will contain all of
// this span's data.
continue
}
covPos := spanCoverStart

// lastCovSpanSize is the size of files added to the right-most span of
Expand Down
51 changes: 41 additions & 10 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand All @@ -36,13 +37,13 @@ func MockBackupChain(length, spans, baseFiles int, r *rand.Rand) []backuppb.Back
for i := range backups {
backups[i].Spans = make(roachpb.Spans, spans)
for j := range backups[i].Spans {
backups[i].Spans[j] = makeTableSpan(uint32(100 + j + (i / 3)))
backups[i].Spans[j] = makeIndexSpan(uint32(100+j), 1)
}
backups[i].EndTime = ts.Add(time.Minute.Nanoseconds()*int64(i), 0)
if i > 0 {
backups[i].StartTime = backups[i-1].EndTime
if i%3 == 0 {
backups[i].IntroducedSpans = roachpb.Spans{backups[i].Spans[spans-1]}
backups[i].IntroducedSpans = roachpb.Spans{backups[i].Spans[r.Intn(spans)]}
}
}

Expand Down Expand Up @@ -84,15 +85,20 @@ func MockBackupChain(length, spans, baseFiles int, r *rand.Rand) []backuppb.Back
// thus sensitive to ordering; the coverage correctness check however is not.
func checkRestoreCovering(
backups []backuppb.BackupManifest,
spans roachpb.Spans,
data restorationData,
cov []execinfrapb.RestoreSpanEntry,
merged bool,
) error {
var expectedPartitions int
required := make(map[string]*roachpb.SpanGroup)
for _, s := range spans {
spans := data.getSpans()
latestIntros := data.getLatestIntros()
for sIdx, s := range spans {
var last roachpb.Key
for _, b := range backups {
if b.EndTime.Less(latestIntros[sIdx]) {
continue
}
for _, f := range b.Files {
if sp := s.Intersect(f.Span); sp.Valid() {
if required[f.Path] == nil {
Expand Down Expand Up @@ -150,15 +156,22 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Files: []backuppb.BackupManifest_File{f("a", "h", "6"), f("j", "k", "7")}},
{Files: []backuppb.BackupManifest_File{f("h", "i", "8"), f("l", "m", "9")}},
}
latestIntros := []hlc.Timestamp{{WallTime: 3}, {}, {}}

// Pretend every span has 1MB.
for i := range backups {
backups[i].StartTime = hlc.Timestamp{WallTime: int64(i)}
backups[i].EndTime = hlc.Timestamp{WallTime: int64(i + 1)}

for j := range backups[i].Files {
// Pretend every span has 1MB.
backups[i].Files[j].EntryCounts.DataSize = 1 << 20
}
}

cover := makeSimpleImportSpans(spans, backups, nil, nil, noSpanTargetSize)
restoreData := restorationDataBase{
spans: spans,
latestIntros: make([]hlc.Timestamp, len(spans)),
}
cover := makeSimpleImportSpans(&restoreData, backups, nil, nil, noSpanTargetSize)
require.Equal(t, []execinfrapb.RestoreSpanEntry{
{Span: sp("a", "c"), Files: paths("1", "4", "6")},
{Span: sp("c", "e"), Files: paths("2", "4", "6")},
Expand All @@ -167,13 +180,21 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: sp("l", "m"), Files: paths("9")},
}, cover)

coverSized := makeSimpleImportSpans(spans, backups, nil, nil, 2<<20)
coverSized := makeSimpleImportSpans(&restoreData, backups, nil, nil, 2<<20)
require.Equal(t, []execinfrapb.RestoreSpanEntry{
{Span: sp("a", "f"), Files: paths("1", "2", "4", "6")},
{Span: sp("f", "i"), Files: paths("3", "5", "6", "8")},
{Span: sp("l", "m"), Files: paths("9")},
}, coverSized)

restoreData.latestIntros = latestIntros
coverTimeFiltering := makeSimpleImportSpans(&restoreData, backups, nil, nil, 2<<20)
require.Equal(t, []execinfrapb.RestoreSpanEntry{
{Span: sp("a", "f"), Files: paths("6")},
{Span: sp("f", "i"), Files: paths("3", "5", "6", "8")},
{Span: sp("l", "m"), Files: paths("9")},
}, coverTimeFiltering)

}

func TestRestoreEntryCover(t *testing.T) {
Expand All @@ -184,10 +205,20 @@ func TestRestoreEntryCover(t *testing.T) {
for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} {
for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} {
backups := MockBackupChain(numBackups, spans, files, r)
latestIntrosByIndex, err := findLatestIntroFromManifests(backups, keys.SystemSQLCodec, hlc.Timestamp{})
require.NoError(t, err)
latestIntrosBySpan, err := findLatestIntroBySpan(backups[numBackups-1].Spans, keys.SystemSQLCodec, latestIntrosByIndex)
require.NoError(t, err)

for _, target := range []int64{0, 1, 4, 100, 1000} {
t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d", numBackups, spans, files, target), func(t *testing.T) {
cover := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil, target<<20)
if err := checkRestoreCovering(backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize); err != nil {
restoreData := restorationDataBase{
spans: backups[numBackups-1].Spans,
latestIntros: latestIntrosBySpan,
}
cover := makeSimpleImportSpans(&restoreData, backups, nil, nil,
target<<20)
if err := checkRestoreCovering(backups, &restoreData, cover, target != noSpanTargetSize); err != nil {
t.Fatal(err)
}
})
Expand Down
Loading

0 comments on commit 4cf6cb1

Please sign in to comment.