diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index f48f4543548f..3014f0cb2c90 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -114,6 +114,15 @@ func countRows(raw kvpb.BulkOpSummary, pkIDs map[uint64]bool) roachpb.RowCount { return res } +// filterSpans returns the spans that represent the set difference +// (includes - excludes). +func filterSpans(includes []roachpb.Span, excludes []roachpb.Span) []roachpb.Span { + var cov roachpb.SpanGroup + cov.Add(includes...) + cov.Sub(excludes...) + return cov.Slice() +} + // backup exports a snapshot of every kv entry into ranged sstables. // // The output is an sstable per range with files in the following locations: @@ -191,8 +200,8 @@ func backup( } // Subtract out any completed spans. - spans := roachpb.SubtractSpansWithCopy(backupManifest.Spans, completedSpans) - introducedSpans := roachpb.SubtractSpansWithCopy(backupManifest.IntroducedSpans, completedIntroducedSpans) + spans := filterSpans(backupManifest.Spans, completedSpans) + introducedSpans := filterSpans(backupManifest.IntroducedSpans, completedIntroducedSpans) pkIDs := make(map[uint64]bool) for i := range backupManifest.Descriptors { @@ -1154,8 +1163,10 @@ func forEachPublicIndexTableSpan( }) } -// spansForAllTableIndexes returns the overlappings spans for every index and -// table passed in. +// spansForAllTableIndexes returns non-overlapping spans for every index and +// table passed in. They would normally overlap if any of them are interleaved. +// Overlapping index spans are merged so as to optimize the size/number of the +// spans we BACKUP and lay protected ts records for. func spansForAllTableIndexes( execCfg *sql.ExecutorConfig, tables []catalog.TableDescriptor, @@ -1198,12 +1209,17 @@ func spansForAllTableIndexes( return false }) + // Attempt to merge any contiguous spans generated from the tables and revs. + // No need to check if the spans are distinct, since some of the merged + // indexes may overlap between different revisions of the same descriptor. + mergedSpans, _ := roachpb.MergeSpans(&spans) + knobs := execCfg.BackupRestoreTestingKnobs if knobs != nil && knobs.CaptureResolvedTableDescSpans != nil { - knobs.CaptureResolvedTableDescSpans(spans) + knobs.CaptureResolvedTableDescSpans(mergedSpans) } - return spans, nil + return mergedSpans, nil } func getScheduledBackupExecutionArgsFromSchedule( @@ -1611,11 +1627,11 @@ func createBackupManifest( spans = append(spans, tenantSpans...) tenants = append(tenants, tenantInfos...) - tableIndexSpans, err := spansForAllTableIndexes(execCfg, tables, revs) + tableSpans, err := spansForAllTableIndexes(execCfg, tables, revs) if err != nil { return backuppb.BackupManifest{}, err } - spans = append(spans, tableIndexSpans...) + spans = append(spans, tableSpans...) if len(prevBackups) > 0 { tablesInPrev := make(map[descpb.ID]struct{}) @@ -1649,7 +1665,7 @@ func createBackupManifest( } } - newSpans = roachpb.SubtractSpansWithCopy(spans, prevBackups[len(prevBackups)-1].Spans) + newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans) } // if CompleteDbs is lost by a 1.x node, FormatDescriptorTrackingVersion diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 7cef9301ee84..91b48f9b005b 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -57,13 +57,13 @@ func distBackupPlanSpecs( var introducedSpanPartitions []sql.SpanPartition var err error if len(spans) > 0 { - spanPartitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, spans) + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans) if err != nil { return nil, err } } if len(introducedSpans) > 0 { - introducedSpanPartitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, introducedSpans) + introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans) if err != nil { return nil, err } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 151474315f09..58d2cb65fc9c 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -539,7 +539,7 @@ func TestBackupManifestFileCount(t *testing.T) { const numAccounts = 1000 _, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, InitManualReplication) defer cleanupFn() - sqlDB.Exec(t, "BACKUP DATABASE data INTO 'userfile:///backup'") + sqlDB.Exec(t, "BACKUP INTO 'userfile:///backup'") rows := sqlDB.QueryRow(t, "SELECT count(distinct(path)) FROM [SHOW BACKUP FILES FROM LATEST IN 'userfile:///backup']") var count int rows.Scan(&count) @@ -6290,7 +6290,7 @@ func TestPublicIndexTableSpans(t *testing.T) { pkIndex: getMockIndexDesc(1), indexes: []descpb.IndexDescriptor{getMockIndexDesc(1), getMockIndexDesc(2)}, expectedSpans: []string{"/Table/55/{1-2}", "/Table/55/{2-3}"}, - expectedMergedSpans: []string{"/Table/55/{1-2}", "/Table/55/{2-3}"}, + expectedMergedSpans: []string{"/Table/55/{1-3}"}, }, { name: "dropped-span-between-two-spans", @@ -6356,7 +6356,7 @@ func TestPublicIndexTableSpans(t *testing.T) { }, addingIndexes: []descpb.IndexDescriptor{getMockIndexDesc(2)}, expectedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-4}", "/Table/61/{4-5}"}, - expectedMergedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-4}", "/Table/61/{4-5}"}, + expectedMergedSpans: []string{"/Table/61/{1-2}", "/Table/61/{3-5}"}, }, } diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index 195729d5d82c..b087562d716c 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -94,7 +94,7 @@ var WriteMetadataSST = settings.RegisterBoolSetting( settings.ApplicationLevel, "kv.bulkio.write_metadata_sst.enabled", "write experimental new format BACKUP metadata file", - false, + util.ConstantWithMetamorphicTestBool("write-metadata-sst", false), ) // WriteMetadataWithExternalSSTsEnabled controls if we write a `BACKUP_METADATA` diff --git a/pkg/ccl/backupccl/backuprand/backup_rand_test.go b/pkg/ccl/backupccl/backuprand/backup_rand_test.go index bc58d4f2278b..68d01d4d98b0 100644 --- a/pkg/ccl/backupccl/backuprand/backup_rand_test.go +++ b/pkg/ccl/backupccl/backuprand/backup_rand_test.go @@ -166,7 +166,9 @@ database_name = 'rand' AND schema_name = 'public'`) withOnlineRestore := func() string { onlineRestoreExtension := "" if rng.Intn(2) != 0 { - onlineRestoreExtension = ", experimental deferred copy" + // TODO(msbutler): once this test is deflaked, add back the online restore + // variant of this test. + onlineRestoreExtension = "" } return onlineRestoreExtension } diff --git a/pkg/cmd/roachtest/tests/backup_restore_roundtrip.go b/pkg/cmd/roachtest/tests/backup_restore_roundtrip.go index 75664d1e82a4..cf9b48f9313a 100644 --- a/pkg/cmd/roachtest/tests/backup_restore_roundtrip.go +++ b/pkg/cmd/roachtest/tests/backup_restore_roundtrip.go @@ -50,6 +50,7 @@ const numFullBackups = 5 type roundTripSpecs struct { name string metamorphicRangeSize bool + onlineRestore bool mock bool skip string } @@ -65,6 +66,12 @@ func registerBackupRestoreRoundTrip(r registry.Registry) { name: "backup-restore/small-ranges", metamorphicRangeSize: true, }, + { + name: "backup-restore/online-restore", + metamorphicRangeSize: false, + onlineRestore: true, + skip: "it fails consistently", + }, { name: "backup-restore/mock", mock: true, @@ -115,7 +122,7 @@ func backupRestoreRoundTrip( m := c.NewMonitor(ctx, roachNodes) m.Go(func(ctx context.Context) error { - testUtils, err := newCommonTestUtils(ctx, t, c, roachNodes, sp.mock) + testUtils, err := newCommonTestUtils(ctx, t, c, roachNodes, sp.mock, sp.onlineRestore) if err != nil { return err } diff --git a/pkg/cmd/roachtest/tests/mixed_version_backup.go b/pkg/cmd/roachtest/tests/mixed_version_backup.go index 5d43114237f3..c409b89dcc79 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_backup.go +++ b/pkg/cmd/roachtest/tests/mixed_version_backup.go @@ -467,10 +467,11 @@ func (ep encryptionPassphrase) String() string { // newBackupOptions returns a list of backup options to be used when // creating a new backup. Each backup option has a 50% chance of being // included. -func newBackupOptions(rng *rand.Rand) []backupOption { - possibleOpts := []backupOption{ - revisionHistory{}, - newEncryptionPassphrase(rng), +func newBackupOptions(rng *rand.Rand, onlineRestoreExpected bool) []backupOption { + possibleOpts := []backupOption{} + if !onlineRestoreExpected { + possibleOpts = append(possibleOpts, revisionHistory{}) + possibleOpts = append(possibleOpts, newEncryptionPassphrase(rng)) } var options []backupOption @@ -1788,7 +1789,7 @@ func (d *BackupRestoreTestDriver) runBackup( case fullBackup: btype := d.newBackupScope(rng) name := d.backupCollectionName(d.nextBackupID(), b.namePrefix, btype) - createOptions := newBackupOptions(rng) + createOptions := newBackupOptions(rng, d.testUtils.onlineRestore) collection = newBackupCollection(name, btype, createOptions, d.cluster.IsLocal()) l.Printf("creating full backup for %s", collection.name) case incrementalBackup: @@ -1953,6 +1954,9 @@ func (d *BackupRestoreTestDriver) createBackupCollection( if d.testUtils.mock { numIncrementals = 1 } + if d.testUtils.onlineRestore { + numIncrementals = 0 + } l.Printf("creating %d incremental backups", numIncrementals) for i := 0; i < numIncrementals; i++ { d.randomWait(l, rng) @@ -2243,6 +2247,9 @@ func (bc *backupCollection) verifyBackupCollection( if opt := bc.encryptionOption(); opt != nil { restoreOptions = append(restoreOptions, opt.String()) } + if d.testUtils.onlineRestore { + restoreOptions = append(restoreOptions, "experimental deferred copy") + } var optionsStr string if len(restoreOptions) > 0 { @@ -2637,10 +2644,11 @@ func prepSchemaChangeWorkload( } type CommonTestUtils struct { - t test.Test - cluster cluster.Cluster - roachNodes option.NodeListOption - mock bool + t test.Test + cluster cluster.Cluster + roachNodes option.NodeListOption + mock bool + onlineRestore bool connCache struct { mu syncutil.Mutex @@ -2652,7 +2660,12 @@ type CommonTestUtils struct { // and puts these connections in a cache for reuse. The caller should remember to close all connections // once done with them to prevent any goroutine leaks (CloseConnections). func newCommonTestUtils( - ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption, mock bool, + ctx context.Context, + t test.Test, + c cluster.Cluster, + nodes option.NodeListOption, + mock bool, + onlineRestore bool, ) (*CommonTestUtils, error) { cc := make([]*gosql.DB, len(nodes)) for _, node := range nodes { @@ -2669,10 +2682,11 @@ func newCommonTestUtils( } u := &CommonTestUtils{ - t: t, - cluster: c, - roachNodes: nodes, - mock: mock, + t: t, + cluster: c, + roachNodes: nodes, + mock: mock, + onlineRestore: onlineRestore, } u.connCache.cache = cc return u, nil @@ -2681,7 +2695,7 @@ func newCommonTestUtils( func (mvb *mixedVersionBackup) CommonTestUtils(ctx context.Context) (*CommonTestUtils, error) { var err error mvb.utilsOnce.Do(func() { - mvb.commonTestUtils, err = newCommonTestUtils(ctx, mvb.t, mvb.cluster, mvb.roachNodes, false) + mvb.commonTestUtils, err = newCommonTestUtils(ctx, mvb.t, mvb.cluster, mvb.roachNodes, false, false) }) return mvb.commonTestUtils, err } diff --git a/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go b/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go index cc8215646c5c..14ec20f56bf8 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go +++ b/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go @@ -85,7 +85,7 @@ func executeSupportedDDLs( nodes = helper.Context.NodesInPreviousVersion() // N.B. this is the set of oldNodes. } } - testUtils, err := newCommonTestUtils(ctx, t, c, helper.Context.CockroachNodes, false) + testUtils, err := newCommonTestUtils(ctx, t, c, helper.Context.CockroachNodes, false, false) defer testUtils.CloseConnections() if err != nil { return err diff --git a/pkg/roachpb/merge_spans.go b/pkg/roachpb/merge_spans.go index ebb386bedf37..1e0444b05af7 100644 --- a/pkg/roachpb/merge_spans.go +++ b/pkg/roachpb/merge_spans.go @@ -101,16 +101,6 @@ func MergeSpans(spans *[]Span) ([]Span, bool) { return r, distinct } -// SubtractSpansWithCopy is the same thing as Subtract spans, but copies the -// todo span first so it can be used after this function call. -func SubtractSpansWithCopy(todo, done Spans) Spans { - newTodo := make(Spans, 0, len(todo)) - for i := range todo { - newTodo = append(newTodo, todo[i].Clone()) - } - return SubtractSpans(newTodo, done) -} - // SubtractSpans subtracts the subspans covered by a set of non-overlapping // spans from another set of non-overlapping spans. // diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index cc26045ee580..5305b753faf2 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1321,16 +1321,7 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem( func (dsp *DistSQLPlanner) PartitionSpans( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) ([]SpanPartition, error) { - partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, false /* disallowMergeToExistingParition */) - return partitions, err -} - -// ParitionSpansWithoutMerging is the same as PartitionSpans but does not merge -// an input span into any existing partitioned span. -func (dsp *DistSQLPlanner) PartitionSpansWithoutMerging( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, -) ([]SpanPartition, error) { - partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, true /* disallowMergeToExistingParition */) + partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans) return partitions, err } @@ -1338,10 +1329,7 @@ func (dsp *DistSQLPlanner) PartitionSpansWithoutMerging( // boolean indicating whether the misplanned ranges metadata should not be // generated. func (dsp *DistSQLPlanner) partitionSpansEx( - ctx context.Context, - planCtx *PlanningCtx, - spans roachpb.Spans, - disallowMergeToExistingParition bool, + ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (_ []SpanPartition, ignoreMisplannedRanges bool, _ error) { if len(spans) == 0 { return nil, false, errors.AssertionFailedf("no spans") @@ -1355,9 +1343,9 @@ func (dsp *DistSQLPlanner) partitionSpansEx( true /* ignoreMisplannedRanges */, nil } if dsp.useGossipPlanning(ctx, planCtx) { - return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans, disallowMergeToExistingParition) + return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans) } - return dsp.partitionSpans(ctx, planCtx, spans, disallowMergeToExistingParition) + return dsp.partitionSpans(ctx, planCtx, spans) } // partitionSpan takes a single span and splits it up according to the owning @@ -1382,7 +1370,6 @@ func (dsp *DistSQLPlanner) partitionSpan( nodeMap map[base.SQLInstanceID]int, getSQLInstanceIDForKVNodeID func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason), ignoreMisplannedRanges *bool, - disallowMergeToExistingParition bool, ) (_ []SpanPartition, lastPartitionIdx int, _ error) { it := planCtx.spanIter // rSpan is the span we are currently partitioning. @@ -1460,7 +1447,7 @@ func (dsp *DistSQLPlanner) partitionSpan( partitionedSpan.String(), sqlInstanceID, reason.String()) } - if lastSQLInstanceID == sqlInstanceID && !disallowMergeToExistingParition { + if lastSQLInstanceID == sqlInstanceID { // Two consecutive ranges on the same node, merge the spans. partition.Spans[len(partition.Spans)-1].EndKey = endKey.AsRawKey() } else { @@ -1482,10 +1469,7 @@ func (dsp *DistSQLPlanner) partitionSpan( // deprecatedPartitionSpansSystem finds node owners for ranges touching the given spans // for a system tenant. func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( - ctx context.Context, - planCtx *PlanningCtx, - spans roachpb.Spans, - disallowMergeToExistingParition bool, + ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { nodeMap := make(map[base.SQLInstanceID]int) resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) { @@ -1494,7 +1478,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( for _, span := range spans { var err error partitions, _, err = dsp.partitionSpan( - ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, disallowMergeToExistingParition, + ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, ) if err != nil { return nil, false, err @@ -1510,10 +1494,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( // available SQL instances if the locality info is available on at least some of // the instances, and it falls back to naive round-robin assignment if not. func (dsp *DistSQLPlanner) partitionSpans( - ctx context.Context, - planCtx *PlanningCtx, - spans roachpb.Spans, - disallowMergeToExistingParition bool, + ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { resolver, err := dsp.makeInstanceResolver(ctx, planCtx) if err != nil { @@ -1538,7 +1519,7 @@ func (dsp *DistSQLPlanner) partitionSpans( lastKey = safeKey } partitions, lastPartitionIdx, err = dsp.partitionSpan( - ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, disallowMergeToExistingParition, + ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, ) if err != nil { return nil, false, err @@ -2127,7 +2108,7 @@ func (dsp *DistSQLPlanner) planTableReaders( // still read too eagerly in the soft limit case. To prevent this we'll // need a new mechanism on the execution side to modulate table reads. // TODO(yuzefovich): add that mechanism. - spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans, false) + spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans) if err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 052f851e43a5..9fbf25f9338d 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -672,8 +672,6 @@ func TestPartitionSpans(t *testing.T) { locFilter string - withoutMerging bool - // expected result: a map of node to list of spans. partitions map[int][][2]string partitionStates []string @@ -966,34 +964,6 @@ func TestPartitionSpans(t *testing.T) { totalPartitionSpans: 2, }, }, - // A single span touching multiple ranges but on the same node results - // in a multiple partitioned span iff ParitionSpansWithoutMerging is used. - { - withoutMerging: true, - ranges: []testSpanResolverRange{{"A", 1}, {"A1", 1}, {"B", 2}}, - gatewayNode: 1, - - spans: [][2]string{{"A", "B"}}, - - partitions: map[int][][2]string{ - 1: {{"A", "A1"}, {"A1", "B"}}, - }, - - partitionStates: []string{ - "partition span: A{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - }, - - partitionState: spanPartitionState{ - partitionSpans: map[base.SQLInstanceID]int{ - 1: 2, - }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, - }, - totalPartitionSpans: 2, - }, - }, // Test some locality-filtered planning too. // // Since this test is run on a system tenant but there is a locality filter, @@ -1235,17 +1205,11 @@ func TestPartitionSpans(t *testing.T) { for _, s := range tc.spans { spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])}) } - var partitions []SpanPartition - var err error - if tc.withoutMerging { - partitions, err = dsp.PartitionSpansWithoutMerging(ctx, planCtx, spans) - } else { - partitions, err = dsp.PartitionSpans(ctx, planCtx, spans) - } + + partitions, err := dsp.PartitionSpans(ctx, planCtx, spans) if err != nil { t.Fatal(err) } - countRanges := func(parts []SpanPartition) (count int) { for _, sp := range parts { ri := tsp.NewSpanResolverIterator(nil, nil)