Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: revert logic to allocate backup work on an index level #122937

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func countRows(raw kvpb.BulkOpSummary, pkIDs map[uint64]bool) roachpb.RowCount {
return res
}

// filterSpans returns the spans that represent the set difference
// (includes - excludes).
func filterSpans(includes []roachpb.Span, excludes []roachpb.Span) []roachpb.Span {
var cov roachpb.SpanGroup
cov.Add(includes...)
cov.Sub(excludes...)
return cov.Slice()
}

// backup exports a snapshot of every kv entry into ranged sstables.
//
// The output is an sstable per range with files in the following locations:
Expand Down Expand Up @@ -191,8 +200,8 @@ func backup(
}

// Subtract out any completed spans.
spans := roachpb.SubtractSpansWithCopy(backupManifest.Spans, completedSpans)
introducedSpans := roachpb.SubtractSpansWithCopy(backupManifest.IntroducedSpans, completedIntroducedSpans)
spans := filterSpans(backupManifest.Spans, completedSpans)
introducedSpans := filterSpans(backupManifest.IntroducedSpans, completedIntroducedSpans)

pkIDs := make(map[uint64]bool)
for i := range backupManifest.Descriptors {
Expand Down Expand Up @@ -1154,8 +1163,10 @@ func forEachPublicIndexTableSpan(
})
}

// spansForAllTableIndexes returns the overlappings spans for every index and
// table passed in.
// spansForAllTableIndexes returns non-overlapping spans for every index and
// table passed in. They would normally overlap if any of them are interleaved.
// Overlapping index spans are merged so as to optimize the size/number of the
// spans we BACKUP and lay protected ts records for.
func spansForAllTableIndexes(
execCfg *sql.ExecutorConfig,
tables []catalog.TableDescriptor,
Expand Down Expand Up @@ -1198,12 +1209,17 @@ func spansForAllTableIndexes(
return false
})

// Attempt to merge any contiguous spans generated from the tables and revs.
// No need to check if the spans are distinct, since some of the merged
// indexes may overlap between different revisions of the same descriptor.
mergedSpans, _ := roachpb.MergeSpans(&spans)

knobs := execCfg.BackupRestoreTestingKnobs
if knobs != nil && knobs.CaptureResolvedTableDescSpans != nil {
knobs.CaptureResolvedTableDescSpans(spans)
knobs.CaptureResolvedTableDescSpans(mergedSpans)
}

return spans, nil
return mergedSpans, nil
}

func getScheduledBackupExecutionArgsFromSchedule(
Expand Down Expand Up @@ -1611,11 +1627,11 @@ func createBackupManifest(
spans = append(spans, tenantSpans...)
tenants = append(tenants, tenantInfos...)

tableIndexSpans, err := spansForAllTableIndexes(execCfg, tables, revs)
tableSpans, err := spansForAllTableIndexes(execCfg, tables, revs)
if err != nil {
return backuppb.BackupManifest{}, err
}
spans = append(spans, tableIndexSpans...)
spans = append(spans, tableSpans...)

if len(prevBackups) > 0 {
tablesInPrev := make(map[descpb.ID]struct{})
Expand Down Expand Up @@ -1649,7 +1665,7 @@ func createBackupManifest(
}
}

newSpans = roachpb.SubtractSpansWithCopy(spans, prevBackups[len(prevBackups)-1].Spans)
newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans)
}

// if CompleteDbs is lost by a 1.x node, FormatDescriptorTrackingVersion
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func distBackupPlanSpecs(
var introducedSpanPartitions []sql.SpanPartition
var err error
if len(spans) > 0 {
spanPartitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, spans)
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans)
if err != nil {
return nil, err
}
}
if len(introducedSpans) > 0 {
introducedSpanPartitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, introducedSpans)
introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestBackupManifestFileCount(t *testing.T) {
const numAccounts = 1000
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, InitManualReplication)
defer cleanupFn()
sqlDB.Exec(t, "BACKUP DATABASE data INTO 'userfile:///backup'")
sqlDB.Exec(t, "BACKUP INTO 'userfile:///backup'")
rows := sqlDB.QueryRow(t, "SELECT count(distinct(path)) FROM [SHOW BACKUP FILES FROM LATEST IN 'userfile:///backup']")
var count int
rows.Scan(&count)
Expand Down Expand Up @@ -6290,7 +6290,7 @@ func TestPublicIndexTableSpans(t *testing.T) {
pkIndex: getMockIndexDesc(1),
indexes: []descpb.IndexDescriptor{getMockIndexDesc(1), getMockIndexDesc(2)},
expectedSpans: []string{"/Table/55/{1-2}", "/Table/55/{2-3}"},
expectedMergedSpans: []string{"/Table/55/{1-2}", "/Table/55/{2-3}"},
expectedMergedSpans: []string{"/Table/55/{1-3}"},
},
{
name: "dropped-span-between-two-spans",
Expand Down Expand Up @@ -6356,7 +6356,7 @@ func TestPublicIndexTableSpans(t *testing.T) {
},
addingIndexes: []descpb.IndexDescriptor{getMockIndexDesc(2)},
expectedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-4}", "/Table/61/{4-5}"},
expectedMergedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-4}", "/Table/61/{4-5}"},
expectedMergedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-5}"},
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backupinfo/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var WriteMetadataSST = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.bulkio.write_metadata_sst.enabled",
"write experimental new format BACKUP metadata file",
false,
util.ConstantWithMetamorphicTestBool("write-metadata-sst", false),
)

// WriteMetadataWithExternalSSTsEnabled controls if we write a `BACKUP_METADATA`
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ database_name = 'rand' AND schema_name = 'public'`)
withOnlineRestore := func() string {
onlineRestoreExtension := ""
if rng.Intn(2) != 0 {
onlineRestoreExtension = ", experimental deferred copy"
// TODO(msbutler): once this test is deflaked, add back the online restore
// variant of this test.
onlineRestoreExtension = ""
}
return onlineRestoreExtension
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/roachpb/merge_spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,6 @@ func MergeSpans(spans *[]Span) ([]Span, bool) {
return r, distinct
}

// SubtractSpansWithCopy is the same thing as Subtract spans, but copies the
// todo span first so it can be used after this function call.
func SubtractSpansWithCopy(todo, done Spans) Spans {
newTodo := make(Spans, 0, len(todo))
for i := range todo {
newTodo = append(newTodo, todo[i].Clone())
}
return SubtractSpans(newTodo, done)
}

// SubtractSpans subtracts the subspans covered by a set of non-overlapping
// spans from another set of non-overlapping spans.
//
Expand Down
39 changes: 10 additions & 29 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,27 +1321,15 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem(
func (dsp *DistSQLPlanner) PartitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) ([]SpanPartition, error) {
partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, false /* disallowMergeToExistingParition */)
return partitions, err
}

// ParitionSpansWithoutMerging is the same as PartitionSpans but does not merge
// an input span into any existing partitioned span.
func (dsp *DistSQLPlanner) PartitionSpansWithoutMerging(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) ([]SpanPartition, error) {
partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, true /* disallowMergeToExistingParition */)
partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans)
return partitions, err
}

// partitionSpansEx is the same as PartitionSpans but additionally returns a
// boolean indicating whether the misplanned ranges metadata should not be
// generated.
func (dsp *DistSQLPlanner) partitionSpansEx(
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
disallowMergeToExistingParition bool,
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (_ []SpanPartition, ignoreMisplannedRanges bool, _ error) {
if len(spans) == 0 {
return nil, false, errors.AssertionFailedf("no spans")
Expand All @@ -1355,9 +1343,9 @@ func (dsp *DistSQLPlanner) partitionSpansEx(
true /* ignoreMisplannedRanges */, nil
}
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans, disallowMergeToExistingParition)
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans)
}
return dsp.partitionSpans(ctx, planCtx, spans, disallowMergeToExistingParition)
return dsp.partitionSpans(ctx, planCtx, spans)
}

// partitionSpan takes a single span and splits it up according to the owning
Expand All @@ -1382,7 +1370,6 @@ func (dsp *DistSQLPlanner) partitionSpan(
nodeMap map[base.SQLInstanceID]int,
getSQLInstanceIDForKVNodeID func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason),
ignoreMisplannedRanges *bool,
disallowMergeToExistingParition bool,
) (_ []SpanPartition, lastPartitionIdx int, _ error) {
it := planCtx.spanIter
// rSpan is the span we are currently partitioning.
Expand Down Expand Up @@ -1460,7 +1447,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
partitionedSpan.String(), sqlInstanceID, reason.String())
}

if lastSQLInstanceID == sqlInstanceID && !disallowMergeToExistingParition {
if lastSQLInstanceID == sqlInstanceID {
// Two consecutive ranges on the same node, merge the spans.
partition.Spans[len(partition.Spans)-1].EndKey = endKey.AsRawKey()
} else {
Expand All @@ -1482,10 +1469,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
// deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans
// for a system tenant.
func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
disallowMergeToExistingParition bool,
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
Expand All @@ -1494,7 +1478,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
for _, span := range spans {
var err error
partitions, _, err = dsp.partitionSpan(
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, disallowMergeToExistingParition,
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges,
)
if err != nil {
return nil, false, err
Expand All @@ -1510,10 +1494,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
// available SQL instances if the locality info is available on at least some of
// the instances, and it falls back to naive round-robin assignment if not.
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
disallowMergeToExistingParition bool,
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
Expand All @@ -1538,7 +1519,7 @@ func (dsp *DistSQLPlanner) partitionSpans(
lastKey = safeKey
}
partitions, lastPartitionIdx, err = dsp.partitionSpan(
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, disallowMergeToExistingParition,
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges,
)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -2127,7 +2108,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
// still read too eagerly in the soft limit case. To prevent this we'll
// need a new mechanism on the execution side to modulate table reads.
// TODO(yuzefovich): add that mechanism.
spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans, false)
spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans)
if err != nil {
return err
}
Expand Down
40 changes: 2 additions & 38 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,6 @@ func TestPartitionSpans(t *testing.T) {

locFilter string

withoutMerging bool

// expected result: a map of node to list of spans.
partitions map[int][][2]string
partitionStates []string
Expand Down Expand Up @@ -966,34 +964,6 @@ func TestPartitionSpans(t *testing.T) {
totalPartitionSpans: 2,
},
},
// A single span touching multiple ranges but on the same node results
// in a multiple partitioned span iff ParitionSpansWithoutMerging is used.
{
withoutMerging: true,
ranges: []testSpanResolverRange{{"A", 1}, {"A1", 1}, {"B", 2}},
gatewayNode: 1,

spans: [][2]string{{"A", "B"}},

partitions: map[int][][2]string{
1: {{"A", "A1"}, {"A1", "B"}},
},

partitionStates: []string{
"partition span: A{-1}, instance ID: 1, reason: gossip-target-healthy",
"partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy",
},

partitionState: spanPartitionState{
partitionSpans: map[base.SQLInstanceID]int{
1: 2,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2,
},
totalPartitionSpans: 2,
},
},
// Test some locality-filtered planning too.
//
// Since this test is run on a system tenant but there is a locality filter,
Expand Down Expand Up @@ -1235,17 +1205,11 @@ func TestPartitionSpans(t *testing.T) {
for _, s := range tc.spans {
spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])})
}
var partitions []SpanPartition
var err error
if tc.withoutMerging {
partitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, spans)
} else {
partitions, err = dsp.PartitionSpans(ctx, planCtx, spans)
}

partitions, err := dsp.PartitionSpans(ctx, planCtx, spans)
if err != nil {
t.Fatal(err)
}

countRanges := func(parts []SpanPartition) (count int) {
for _, sp := range parts {
ri := tsp.NewSpanResolverIterator(nil, nil)
Expand Down
Loading