Skip to content

Commit

Permalink
backupccl: elide spans from backups that were subsequently reintroduced
Browse files Browse the repository at this point in the history
Currently RESTORE may restore invalid backup data from a backed up table that
underwent an IMPORT rollback. See #87305 for a detailed explanation.

This patch ensures that RESTORE elides older backup data that were deleted via
a non-MVCC operation. Because incremental backups always reintroduce spans
(i.e. backs them up from timestamp 0) that may have undergone a non-mvcc
operation, restore can identify restoring spans with potentially corrupt data
in the backup chain and only ingest the spans' reintroduced data to any system
time, without the corrupt data.

Here's the basic impliemenation in Restore:
- For each span we want to restore
   - identify the last time, l, the span was introduced, using the manifests
   - dont restore the span using a backup if backup.EndTime < l

This implementation rests on the following assumption: the input spans for each
restoration flow (created in createImportingDescriptors) and the
restoreSpanEntries (created by makeSimpleImportSpans) do not span across
multiple tables. Given this assumption, makeSimpleImportSpans skips adding
files from a backups for a given input span that was reintroduced in a
subsequent backup.

It's worth noting that all significant refactoring occurs on code run by
the restore coordinator; therefore, no special care needs to be taken for
mixed / cross version backups. In other words, if the coordinator has updated,
the cluster restores properly; else, the bug will exist on the restored cluster.
It's also worth noting that other forms of this bug are apparent on older
cluster versions (#88042, #88043) and has not been noticed by customers; thus,
there is no need to fail a mixed version restore to protect the customer from
this already existing bug.

Informs #87305

Release justification: bug fix

Release note (bug fix): fix for TA advisory
https://cockroachlabs.atlassian.net/browse/TSE-198
  • Loading branch information
msbutler committed Sep 29, 2022
1 parent 220099f commit 4ee23ee
Show file tree
Hide file tree
Showing 13 changed files with 1,123 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ go_test(
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ message BackupManifest {
// here are covered in the interval (0, startTime], which, in conjunction with
// the coverage from (startTime, endTime] implied for all spans in Spans,
// results in coverage from [0, endTime] for these spans.
//
// The first set of spans in this field are new spans that did not
// exist in the previous backup (a new index, for example), while the remaining
// spans are re-introduced spans, which need to be backed up again from (0,
// startTime] because a non-mvcc operation may have occurred on this span. See
// the getReintroducedSpans() for more information.
repeated roachpb.Span introduced_spans = 15 [(gogoproto.nullable) = false];

repeated DescriptorRevision descriptor_changes = 16 [(gogoproto.nullable) = false];
Expand Down
11 changes: 5 additions & 6 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ func getReintroducedSpans(
// backup was offline at the endTime of the last backup.
latestTableDescChangeInLastBackup := make(map[descpb.ID]*descpb.TableDescriptor)
for _, rev := range lastBackup.DescriptorChanges {
if table, _, _, _, _ := descpb.FromDescriptor(rev.Desc); table != nil {
if table, _, _, _ := descpb.FromDescriptor(rev.Desc); table != nil {
if trackedRev, ok := latestTableDescChangeInLastBackup[table.GetID()]; !ok {
latestTableDescChangeInLastBackup[table.GetID()] = table
} else if trackedRev.Version < table.Version {
Expand Down Expand Up @@ -1486,6 +1486,7 @@ func getReintroducedSpans(
// table may have been OFFLINE at the time of the last backup, and OFFLINE at
// the time of the current backup, but may have been PUBLIC at some time in
// between.

for _, rev := range revs {
rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTable == nil {
Expand All @@ -1510,7 +1511,6 @@ func getReintroducedSpans(
allRevs = append(allRevs, rev)
}
}

tableSpans, err := spansForAllTableIndexes(execCfg, tablesToReinclude, allRevs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1872,8 +1872,7 @@ func getBackupDetailAndManifest(
}
}

var newSpans roachpb.Spans

var newSpans, reIntroducedSpans roachpb.Spans
var priorIDs map[descpb.ID]descpb.ID

var revs []BackupManifest_DescriptorRevision
Expand Down Expand Up @@ -1954,11 +1953,11 @@ func getBackupDetailAndManifest(

newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans)

tableSpans, err := getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime)
reIntroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
newSpans = append(newSpans, tableSpans...)
newSpans = append(newSpans, reIntroducedSpans...)
}

// if CompleteDbs is lost by a 1.x node, FormatDescriptorTrackingVersion
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package backupccl

import (
"context"
fmt "fmt"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func BenchmarkCoverageChecks(b *testing.B) {
Expand Down Expand Up @@ -44,7 +46,6 @@ func BenchmarkCoverageChecks(b *testing.B) {

func BenchmarkRestoreEntryCover(b *testing.B) {
r, _ := randutil.NewTestRand()

for _, numBackups := range []int{1, 2, 24, 24 * 4} {
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, baseFiles := range []int{0, 100, 10000} {
Expand All @@ -58,7 +59,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
if err := checkCoverage(ctx, backups[numBackups-1].Spans, backups); err != nil {
b.Fatal(err)
}
cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil)
introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{})
require.NoError(b, err)

cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, introducedSpanFrontier, nil)
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,30 @@ func TestDataDriven(t *testing.T) {
require.NoError(t, err)
return ""

case "import":
server := lastCreatedServer
user := "root"
jobType := "IMPORT"

// First, run the backup.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""

case "restore":
server := lastCreatedServer
user := "root"
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ func (*mainRestorationData) isMainBundle() bool { return true }
type restorationDataBase struct {
// spans is the spans included in this bundle.
spans []roachpb.Span

// rekeys maps old table IDs to their new table descriptor.
tableRekeys []execinfrapb.TableRekey

// tenantRekeys maps tenants being restored to their new ID.
tenantRekeys []execinfrapb.TenantRekey

// pkIDs stores the ID of the primary keys for all of the tables that we're
// restoring for RowCount calculation.
pkIDs map[uint64]bool
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func restore(
return emptyRowCount, errors.Wrap(err, "resolving locality locations")
}

introducedSpanFrontier, err := createIntroducedSpanFrontier(backupManifests, endTime)
if err != nil {
return emptyRowCount, err
}

if err := checkCoverage(restoreCtx, dataToRestore.getSpans(), backupManifests); err != nil {
return emptyRowCount, err
}
Expand All @@ -260,7 +265,7 @@ func restore(
highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater

importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap,
highWaterMark)
introducedSpanFrontier, highWaterMark)

if len(importSpans) == 0 {
// There are no files to restore.
Expand Down
81 changes: 68 additions & 13 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
)

type intervalSpan roachpb.Span
Expand All @@ -34,25 +36,29 @@ func (ie intervalSpan) Range() interval.Range {
// based on the lowWaterMark before the covering for them is generated. Consider
// a chain of backups with files f1, f2… which cover spans as follows:
//
// backup
// 0| a___1___c c__2__e h__3__i
// 1| b___4___d g____5___i
// 2| a___________6______________h j_7_k
// 3| h_8_i l_9_m
// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p---->
// backup
// 0| a___1___c c__2__e h__3__i
// 1| b___4___d g____5___i
// 2| a___________6______________h j_7_k
// 3| h_8_i l_9_m
// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p---->
//
// spans: |-------span1-------||---span2---| |---span3---|
//
// The cover for those spans would look like:
// [a, c): 1, 4, 6
// [c, e): 2, 4, 6
// [e, f): 6
// [f, i): 3, 5, 6, 8
// [l, m): 9
//
// [a, c): 1, 4, 6
// [c, e): 2, 4, 6
// [e, f): 6
// [f, i): 3, 5, 6, 8
// [l, m): 9
//
// This example is tested in TestRestoreEntryCoverExample.
func makeSimpleImportSpans(
requiredSpans []roachpb.Span,
backups []BackupManifest,
backupLocalityMap map[int]storeByLocalityKV,
introducedSpanFrontier *spanUtils.Frontier,
lowWaterMark roachpb.Key,
) []execinfrapb.RestoreSpanEntry {
if len(backups) < 1 {
Expand All @@ -62,8 +68,8 @@ func makeSimpleImportSpans(
for i := range backups {
sort.Sort(BackupFileDescriptors(backups[i].Files))
}

var cover []execinfrapb.RestoreSpanEntry

for _, span := range requiredSpans {
if span.EndKey.Compare(lowWaterMark) < 0 {
continue
Expand All @@ -73,8 +79,32 @@ func makeSimpleImportSpans(
}

spanCoverStart := len(cover)

for layer := range backups {

var coveredLater bool
introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span,
ts hlc.Timestamp) (done spanUtils.OpResult) {
if backups[layer].EndTime.Less(ts) {
coveredLater = true
}
return spanUtils.StopMatch
})
if coveredLater {
// Don't use this backup to cover this span if the span was reintroduced
// after the backup's endTime. In this case, this backup may have
// invalid data, and further, a subsequent backup will contain all of
// this span's data. Consider the following example:
//
// T0: Begin IMPORT INTO on existing table foo, ingest some data
// T1: Backup foo
// T2: Rollback IMPORT via clearRange
// T3: Incremental backup of foo, with a full reintroduction of foo’s span
// T4: RESTORE foo: should only restore foo from the incremental backup.
// If data from the full backup were also restored,
// the imported-but-then-clearRanged data will leak in the restored cluster.
// This logic seeks to avoid this form of data corruption.
continue
}
covPos := spanCoverStart
// TODO(dt): binary search to the first file in required span?
for _, f := range backups[layer].Files {
Expand Down Expand Up @@ -117,6 +147,31 @@ func makeSimpleImportSpans(
return cover
}

// createIntroducedSpanFrontier creates a span frontier that tracks the end time
// of the latest incremental backup of each introduced span in the backup chain.
// See ReintroducedSpans( ) for more information. Note: this function assumes
// that manifests are sorted in increasing EndTime.
func createIntroducedSpanFrontier(
manifests []BackupManifest, asOf hlc.Timestamp,
) (*spanUtils.Frontier, error) {
introducedSpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{})
if err != nil {
return nil, err
}
for i, m := range manifests {
if i == 0 {
continue
}
if !asOf.IsEmpty() && asOf.Less(m.StartTime) {
break
}
if err := introducedSpanFrontier.AddSpansAt(m.EndTime, m.IntroducedSpans...); err != nil {
return nil, err
}
}
return introducedSpanFrontier, nil
}

func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrapb.RestoreSpanEntry {
return execinfrapb.RestoreSpanEntry{
Span: roachpb.Span{Key: start, EndKey: end}, Files: []execinfrapb.RestoreFileSpec{f},
Expand Down
Loading

0 comments on commit 4ee23ee

Please sign in to comment.