Skip to content

Commit

Permalink
Merge pull request cockroachdb#74593 from dt/backport21.2-74394
Browse files Browse the repository at this point in the history
  • Loading branch information
dt authored Jan 8, 2022
2 parents f26513f + efb8568 commit aabce20
Show file tree
Hide file tree
Showing 5 changed files with 564 additions and 276 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"restore_planning.go",
"restore_processor_planning.go",
"restore_schema_change_creation.go",
"restore_span_covering.go",
"schedule_exec.go",
"schedule_pts_chaining.go",
"show.go",
Expand Down Expand Up @@ -148,6 +149,7 @@ go_test(
"restore_mid_schema_change_test.go",
"restore_old_sequences_test.go",
"restore_old_versions_test.go",
"restore_span_covering_test.go",
"schedule_pts_chaining_test.go",
"show_test.go",
"split_and_scatter_processor_test.go",
Expand Down
85 changes: 44 additions & 41 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,24 @@ import (
"context"
fmt "fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func BenchmarkCoverageChecks(b *testing.B) {
r, _ := randutil.NewPseudoRand()

for _, numBackups := range []int{1, 7, 24, 24 * 4} {
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, numSpans := range []int{10, 20, 100} {
b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) {
for _, baseFiles := range []int{0, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) {
b.StopTimer()
backups := make([]BackupManifest, numBackups)
ts := hlc.Timestamp{WallTime: time.Second.Nanoseconds()}
for i := range backups {
backups[i].Spans = make(roachpb.Spans, numSpans)
for j := range backups[i].Spans {
backups[i].Spans[j] = makeTableSpan(uint32(100 + j + (i / 4)))
}
backups[i].EndTime = ts.Add(time.Minute.Nanoseconds()*int64(i), 0)
if i > 0 {
backups[i].StartTime = backups[i-1].EndTime
if i%4 == 0 {
backups[i].IntroducedSpans = roachpb.Spans{backups[i].Spans[numSpans-1]}
}
}

numFiles := baseFiles
if i == 0 {
backups[i].Files = make([]BackupManifest_File, numFiles)
} else {
numFiles = numFiles / 2
backups[i].Files = make([]BackupManifest_File, numFiles)
}

for f := range backups[i].Files {
backups[i].Files[f].Path = fmt.Sprintf("1234567890%d.sst", f)
backups[i].Files[f].Span.Key = encoding.EncodeVarintAscending(backups[i].Spans[f*numSpans/numFiles].Key, int64(f))
backups[i].Files[f].Span.EndKey = encoding.EncodeVarintAscending(backups[i].Spans[f*numSpans/numFiles].Key, int64(f+1))

}
backups[i].Dir = roachpb.ExternalStorage{S3Config: &roachpb.ExternalStorage_S3{
Bucket: "some-string-name",
Prefix: "some-string-path/to/some/file",
AccessKey: "some-access-key",
Secret: "some-secret-key",
}}
}
ctx := context.Background()
backups := MockBackupChain(numBackups, numSpans, baseFiles, r)
b.ResetTimer()

ctx := context.Background()
b.Run("checkCoverage", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -93,3 +57,42 @@ func BenchmarkCoverageChecks(b *testing.B) {
})
}
}

func BenchmarkRestoreEntryCover(b *testing.B) {
r, _ := randutil.NewPseudoRand()

for _, numBackups := range []int{1, 2, 24, 24 * 4} {
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, baseFiles := range []int{0, 100, 10000} {
b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) {
for _, numSpans := range []int{10, 100} {
b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) {
b.StopTimer()
ctx := context.Background()
backups := MockBackupChain(numBackups, numSpans, baseFiles, r)
b.ResetTimer()
b.Run("simple", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if err := checkCoverage(ctx, backups[numBackups-1].Spans, backups); err != nil {
b.Fatal(err)
}
cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil)
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
b.Run("coveringMerge", func(b *testing.B) {
for i := 0; i < b.N; i++ {
cov, _, err := makeImportSpans(backups[numBackups-1].Spans, backups, nil, nil, errOnMissingRange)
if err != nil {
b.Fatal(err)
}
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
})
}
})
}
})
}
}
251 changes: 16 additions & 235 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -60,236 +59,6 @@ import (
"github.com/gogo/protobuf/types"
)

type intervalSpan roachpb.Span

var _ interval.Interface = intervalSpan{}

// ID is part of `interval.Interface` but unused in makeImportSpans.
func (ie intervalSpan) ID() uintptr { return 0 }

// Range is part of `interval.Interface`.
func (ie intervalSpan) Range() interval.Range {
return interval.Range{Start: []byte(ie.Key), End: []byte(ie.EndKey)}
}

type importEntryType int

const (
backupSpan importEntryType = iota
backupFile
tableSpan
completedSpan
)

type importEntry struct {
roachpb.Span
entryType importEntryType

// Only set if entryType is backupSpan
start, end hlc.Timestamp

// Only set if entryType is backupFile
dir roachpb.ExternalStorage
file BackupManifest_File
}

// makeImportSpans pivots the backups, which are grouped by time, into
// spans for import, which are grouped by keyrange.
//
// The core logic of this is in OverlapCoveringMerge, which accepts sets of
// non-overlapping key ranges (aka coverings) each with a payload, and returns
// them aligned with the payloads in the same order as in the input.
//
// Example (input):
// - [A, C) backup t0 to t1 -> /file1
// - [C, D) backup t0 to t1 -> /file2
// - [A, B) backup t1 to t2 -> /file3
// - [B, C) backup t1 to t2 -> /file4
// - [C, D) backup t1 to t2 -> /file5
// - [B, D) requested table data to be restored
//
// Example (output):
// - [A, B) -> /file1, /file3
// - [B, C) -> /file1, /file4, requested (note that file1 was split into two ranges)
// - [C, D) -> /file2, /file5, requested
//
// This would be turned into two Import spans, one restoring [B, C) out of
// /file1 and /file4, the other restoring [C, D) out of /file2 and /file5.
// Nothing is restored out of /file3 and only part of /file1 is used.
//
// NB: All grouping operates in the pre-rewrite keyspace, meaning the keyranges
// as they were backed up, not as they're being restored.
//
// If a span is not covered, the onMissing function is called with the span and
// time missing to determine what error, if any, should be returned.
func makeImportSpans(
tableSpans []roachpb.Span,
backups []BackupManifest,
backupLocalityMap map[int]storeByLocalityKV,
lowWaterMark roachpb.Key,
onMissing func(span covering.Range, start, end hlc.Timestamp) error,
) ([]execinfrapb.RestoreSpanEntry, hlc.Timestamp, error) {
// Put the covering for the already-completed spans into the
// OverlapCoveringMerge input first. Payloads are returned in the same order
// that they appear in the input; putting the completedSpan first means we'll
// see it first when iterating over the output of OverlapCoveringMerge and
// avoid doing unnecessary work.
completedCovering := covering.Covering{
{
Start: []byte(keys.MinKey),
End: []byte(lowWaterMark),
Payload: importEntry{entryType: completedSpan},
},
}

// Put the merged table data covering into the OverlapCoveringMerge input
// next.
var tableSpanCovering covering.Covering
for _, span := range tableSpans {
tableSpanCovering = append(tableSpanCovering, covering.Range{
Start: span.Key,
End: span.EndKey,
Payload: importEntry{
Span: span,
entryType: tableSpan,
},
})
}

backupCoverings := []covering.Covering{completedCovering, tableSpanCovering}

// Iterate over backups creating two coverings for each. First the spans
// that were backed up, then the files in the backup. The latter is a subset
// when some of the keyranges in the former didn't change since the previous
// backup. These alternate (backup1 spans, backup1 files, backup2 spans,
// backup2 files) so they will retain that alternation in the output of
// OverlapCoveringMerge.
var maxEndTime hlc.Timestamp
for i, b := range backups {
if maxEndTime.Less(b.EndTime) {
maxEndTime = b.EndTime
}

var backupNewSpanCovering covering.Covering
for _, s := range b.IntroducedSpans {
backupNewSpanCovering = append(backupNewSpanCovering, covering.Range{
Start: s.Key,
End: s.EndKey,
Payload: importEntry{Span: s, entryType: backupSpan, start: hlc.Timestamp{}, end: b.StartTime},
})
}
backupCoverings = append(backupCoverings, backupNewSpanCovering)

var backupSpanCovering covering.Covering
for _, s := range b.Spans {
backupSpanCovering = append(backupSpanCovering, covering.Range{
Start: s.Key,
End: s.EndKey,
Payload: importEntry{Span: s, entryType: backupSpan, start: b.StartTime, end: b.EndTime},
})
}
backupCoverings = append(backupCoverings, backupSpanCovering)
var backupFileCovering covering.Covering

var storesByLocalityKV map[string]roachpb.ExternalStorage
if storesByLocalityKVMap, ok := backupLocalityMap[i]; ok {
storesByLocalityKV = storesByLocalityKVMap
}

for _, f := range b.Files {
dir := b.Dir
if storesByLocalityKV != nil {
if newDir, ok := storesByLocalityKV[f.LocalityKV]; ok {
dir = newDir
}
}
backupFileCovering = append(backupFileCovering, covering.Range{
Start: f.Span.Key,
End: f.Span.EndKey,
Payload: importEntry{
Span: f.Span,
entryType: backupFile,
dir: dir,
file: f,
},
})
}
backupCoverings = append(backupCoverings, backupFileCovering)
}

// Group ranges covered by backups with ones needed to restore the selected
// tables. Note that this breaks intervals up as necessary to align them.
// See the function godoc for details.
importRanges := covering.OverlapCoveringMerge(backupCoverings)

// Translate the output of OverlapCoveringMerge into requests.
var requestEntries []execinfrapb.RestoreSpanEntry
rangeLoop:
for _, importRange := range importRanges {
needed := false
var latestCoveredTime hlc.Timestamp
var files []execinfrapb.RestoreFileSpec
payloads := importRange.Payload.([]interface{})
for _, p := range payloads {
ie := p.(importEntry)
switch ie.entryType {
case completedSpan:
continue rangeLoop
case tableSpan:
needed = true
case backupSpan:
// The latest time we've backed up this span may be ahead of the start
// time of this entry. This is because some spans can be
// "re-introduced", meaning that they were previously backed up but
// still appear in introducedSpans. Spans are re-introduced when they
// were taken OFFLINE (and therefore processed non-transactional writes)
// and brought back online (PUBLIC). For more information see #62564.
if latestCoveredTime.Less(ie.start) {
return nil, hlc.Timestamp{}, errors.Errorf(
"no backup covers time [%s,%s) for range [%s,%s) or backups listed out of order (mismatched start time)",
latestCoveredTime, ie.start,
roachpb.Key(importRange.Start), roachpb.Key(importRange.End))
}
if !ie.end.Less(latestCoveredTime) {
latestCoveredTime = ie.end
}
case backupFile:
if len(ie.file.Path) > 0 {
files = append(files, execinfrapb.RestoreFileSpec{
Dir: ie.dir,
Path: ie.file.Path,
})
}
}
}
if needed {
if latestCoveredTime != maxEndTime {
if err := onMissing(importRange, latestCoveredTime, maxEndTime); err != nil {
return nil, hlc.Timestamp{}, err
}
}
if len(files) == 0 {
// There may be import entries that refer to no data, and hence
// no files. These are caused because file spans start at a
// specific key. E.g. consider the first file backing up data
// from table 51. It will cover span ‹/Table/51/1/0/0› -
// ‹/Table/51/1/3273›. When merged with the backup span:
// ‹/Table/51› - ‹/Table/52›, we get an empty span with no
// files: ‹/Table/51› - ‹/Table/51/1/0/0›. We should ignore
// these to avoid thrashing during restore's split and scatter.
continue
}
// If needed is false, we have data backed up that is not necessary
// for this restore. Skip it.
requestEntries = append(requestEntries, execinfrapb.RestoreSpanEntry{
Span: roachpb.Span{Key: importRange.Start, EndKey: importRange.End},
Files: files,
})
}
}
return requestEntries, maxEndTime, nil
}

func processTableForMultiRegion(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, table catalog.TableDescriptor,
) error {
Expand Down Expand Up @@ -639,11 +408,23 @@ func restore(
// Pivot the backups, which are grouped by time, into requests for import,
// which are grouped by keyrange.
highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater
importSpans, _, err := makeImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap,
highWaterMark, errOnMissingRange)
if err != nil {
return emptyRowCount, errors.Wrapf(err, "making import requests for %d backups", len(backupManifests))

const useSimpleImportSpans = true
var importSpans []execinfrapb.RestoreSpanEntry
if useSimpleImportSpans {
if err := checkCoverage(restoreCtx, dataToRestore.getSpans(), backupManifests); err != nil {
return emptyRowCount, err
}
importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap,
highWaterMark)
} else {
importSpans, _, err = makeImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap,
highWaterMark, errOnMissingRange)
if err != nil {
return emptyRowCount, errors.Wrapf(err, "making import requests for %d backups", len(backupManifests))
}
}

if len(importSpans) == 0 {
// There are no files to restore.
return emptyRowCount, nil
Expand Down
Loading

0 comments on commit aabce20

Please sign in to comment.