diff --git a/docs/generated/sql/bnf/restore_options.bnf b/docs/generated/sql/bnf/restore_options.bnf index 91ea999e9c54..5d6c7dcba2ea 100644 --- a/docs/generated/sql/bnf/restore_options.bnf +++ b/docs/generated/sql/bnf/restore_options.bnf @@ -21,4 +21,4 @@ restore_options ::= | 'UNSAFE_RESTORE_INCOMPATIBLE_VERSION' | 'EXECUTION' 'LOCALITY' '=' string_or_placeholder | 'EXPERIMENTAL' 'DEFERRED' 'COPY' - | 'STRIP_LOCALITIES' + | 'REMOVE_REGIONS' diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index acbee4c11ce3..9eca79ac53c7 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1332,6 +1332,7 @@ unreserved_keyword ::= | 'RELATIVE' | 'RELEASE' | 'RELOCATE' + | 'REMOVE_REGIONS' | 'RENAME' | 'REPEATABLE' | 'REPLACE' @@ -1394,7 +1395,6 @@ unreserved_keyword ::= | 'SKIP_MISSING_SEQUENCE_OWNERS' | 'SKIP_MISSING_VIEWS' | 'SKIP_MISSING_UDFS' - | 'STRIP_LOCALITIES' | 'SNAPSHOT' | 'SPLIT' | 'SQL' @@ -2675,7 +2675,7 @@ restore_options ::= | 'UNSAFE_RESTORE_INCOMPATIBLE_VERSION' | 'EXECUTION' 'LOCALITY' '=' string_or_placeholder | 'EXPERIMENTAL' 'DEFERRED' 'COPY' - | 'STRIP_LOCALITIES' + | 'REMOVE_REGIONS' scrub_option_list ::= ( scrub_option ) ( ( ',' scrub_option ) )* @@ -3918,6 +3918,7 @@ bare_label_keywords ::= | 'RELATIVE' | 'RELEASE' | 'RELOCATE' + | 'REMOVE_REGIONS' | 'RENAME' | 'REPEATABLE' | 'REPLACE' @@ -3984,7 +3985,6 @@ bare_label_keywords ::= | 'SKIP_MISSING_SEQUENCE_OWNERS' | 'SKIP_MISSING_UDFS' | 'SKIP_MISSING_VIEWS' - | 'STRIP_LOCALITIES' | 'SMALLINT' | 'SNAPSHOT' | 'SOME' diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 360c78486940..8405f6a596ca 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1053,7 +1053,7 @@ func createImportingDescriptors( typesByID[types[i].GetID()] = types[i] } - if details.StripLocalities { + if details.RemoveRegions { // Can't restore multi-region tables into non-multi-region database for _, t := range tables { t.TableDesc().LocalityConfig = nil @@ -1093,7 +1093,7 @@ func createImportingDescriptors( // we need to make sure that multi-region databases no longer get tagged as such - meaning // that we want to change the TypeDescriptor_MULTIREGION_ENUM to a normal enum. We `continue` // to skip the multi-region work below. - if details.StripLocalities { + if details.RemoveRegions { t.TypeDesc().Kind = descpb.TypeDescriptor_ENUM t.TypeDesc().RegionConfig = nil continue diff --git a/pkg/ccl/backupccl/restore_multiregion_rbr_test.go b/pkg/ccl/backupccl/restore_multiregion_rbr_test.go index a5f67cb44185..a3df50a47e70 100644 --- a/pkg/ccl/backupccl/restore_multiregion_rbr_test.go +++ b/pkg/ccl/backupccl/restore_multiregion_rbr_test.go @@ -30,7 +30,7 @@ import ( // The goal of this test is to ensure that if a user ever performed a // regionless restore where the backed-up target has a regional by row table, // they would be able to get themselves out of a stuck state without needing -// an enterprise license (in addition to testing the ability to use strip_localities +// an enterprise license (in addition to testing the ability to use remove_regions // without said license). func TestMultiRegionRegionlessRestoreNoLicense(t *testing.T) { defer leaktest.AfterTest(t)() @@ -81,7 +81,7 @@ func TestMultiRegionRegionlessRestoreNoLicense(t *testing.T) { defer sqlTC.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlTC.Conns[0]) - if err := backuptestutils.VerifyBackupRestoreStatementResult(t, sqlDB, `RESTORE DATABASE d FROM LATEST IN $1 WITH strip_localities`, localFoo); err != nil { + if err := backuptestutils.VerifyBackupRestoreStatementResult(t, sqlDB, `RESTORE DATABASE d FROM LATEST IN $1 WITH remove_regions`, localFoo); err != nil { t.Fatal(err) } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 7fea786052ce..95b58edf0634 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1081,7 +1081,7 @@ func resolveOptionsForRestoreJobDescription( VerifyData: opts.VerifyData, UnsafeRestoreIncompatibleVersion: opts.UnsafeRestoreIncompatibleVersion, ExperimentalOnline: opts.ExperimentalOnline, - StripLocalities: opts.StripLocalities, + RemoveRegions: opts.RemoveRegions, } if opts.EncryptionPassphrase != nil { @@ -2075,7 +2075,7 @@ func doRestorePlan( // If we are stripping localities, wipe tables of their LocalityConfig before we allocate // descriptor rewrites - as validation in remapTables compares these tables with the non-mr // database and fails otherwise - if restoreStmt.Options.StripLocalities { + if restoreStmt.Options.RemoveRegions { for _, t := range filteredTablesByID { t.TableDesc().LocalityConfig = nil } @@ -2197,7 +2197,7 @@ func doRestorePlan( SkipLocalitiesCheck: restoreStmt.Options.SkipLocalitiesCheck, ExecutionLocality: execLocality, ExperimentalOnline: restoreStmt.Options.ExperimentalOnline, - StripLocalities: restoreStmt.Options.StripLocalities, + RemoveRegions: restoreStmt.Options.RemoveRegions, } jr := jobs.Record{ diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-alter-region b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-alter-region index 9c60e9c74085..1fb8ed0c7cc8 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-alter-region +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-alter-region @@ -45,7 +45,7 @@ subtest end new-cluster name=s2 share-io-dir=s1 allow-implicit-access localities=us-east-1 ---- -# test cluster restore with strip_localities +# test cluster restore with remove_regions subtest restore_regionless_on_single_region_cluster query-sql @@ -56,7 +56,7 @@ postgres root {} system node {} exec-sql -RESTORE FROM LATEST IN 'nodelocal://1/cluster_backup/' WITH strip_localities; +RESTORE FROM LATEST IN 'nodelocal://1/cluster_backup/' WITH remove_regions; ---- # check cluster's regions @@ -167,7 +167,7 @@ subtest end subtest end -# test db restore with strip_localities +# test db restore with remove_regions subtest restore_regionless_on_single_region_db exec-sql @@ -175,7 +175,7 @@ DROP DATABASE d; ---- exec-sql -RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/database_backup/' WITH strip_localities; +RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/database_backup/' WITH remove_regions; ---- # check to see if restored database, d, shows up @@ -270,7 +270,7 @@ subtest end subtest end -# test table restore with strip_localities +# test table restore with remove_regions subtest restore_regionless_on_single_region_table exec-sql @@ -278,7 +278,7 @@ DROP TABLE d.t; ---- exec-sql -RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/table_backup/' WITH strip_localities; +RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/table_backup/' WITH remove_regions; ---- query-sql diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-on-regionless b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-on-regionless index 4f14da51ca56..df76340f642b 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-on-regionless +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-on-regionless @@ -44,7 +44,7 @@ subtest end new-cluster name=s2 share-io-dir=s1 allow-implicit-access ---- -# test cluster restore with strip_localities +# test cluster restore with remove_regions subtest restore_regionless_on_regionless_cluster query-sql @@ -55,7 +55,7 @@ postgres root {} system node {} exec-sql -RESTORE FROM LATEST IN 'nodelocal://1/cluster_backup/' WITH strip_localities; +RESTORE FROM LATEST IN 'nodelocal://1/cluster_backup/' WITH remove_regions; ---- # check cluster's regions @@ -100,7 +100,7 @@ SELECT * FROM d.t; subtest end -# test db restore with strip_localities +# test db restore with remove_regions subtest restore_regionless_on_regionless_db exec-sql @@ -108,7 +108,7 @@ DROP DATABASE d; ---- exec-sql -RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/database_backup/' WITH strip_localities; +RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/database_backup/' WITH remove_regions; ---- # check to see if restored database, d, shows up @@ -148,7 +148,7 @@ SELECT * FROM d.t; subtest end -# test table restore with strip_localities +# test table restore with remove_regions subtest restore_regionless_on_regionless_table exec-sql @@ -156,7 +156,7 @@ DROP TABLE d.t; ---- exec-sql -RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/table_backup/' WITH strip_localities; +RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/table_backup/' WITH remove_regions; ---- query-sql diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-regional-by-row b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-regional-by-row index 6fa811de5d4e..e55a5cc34b75 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-regional-by-row +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-regionless-regional-by-row @@ -62,7 +62,7 @@ new-cluster name=s2 share-io-dir=s1 allow-implicit-access localities=us-east-1 subtest restore_regionless_cluster_rbr exec-sql -RESTORE FROM LATEST IN 'nodelocal://1/rbr_cluster_backup/' WITH strip_localities; +RESTORE FROM LATEST IN 'nodelocal://1/rbr_cluster_backup/' WITH remove_regions; ---- # check cluster's regions @@ -129,7 +129,7 @@ DROP DATABASE d; ---- exec-sql -RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/rbr_database_backup/' WITH strip_localities; +RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/rbr_database_backup/' WITH remove_regions; ---- # check to see if restored database, d, shows up @@ -191,7 +191,7 @@ DROP TABLE d.t; ---- exec-sql -RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/rbr_table_backup/' WITH strip_localities; +RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/rbr_table_backup/' WITH remove_regions; ---- pq: "crdb_internal_region" is not compatible with type "crdb_internal_region" existing in cluster: "crdb_internal_region" of type "ENUM" is not compatible with type "MULTIREGION_ENUM" @@ -201,7 +201,7 @@ DROP TYPE d.public.crdb_internal_region; ---- exec-sql -RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/rbr_table_backup/' WITH strip_localities; +RESTORE TABLE d.t FROM LATEST IN 'nodelocal://1/rbr_table_backup/' WITH remove_regions; ---- query-sql diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 7b60c262131e..b511083a0f66 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "ingest_span_configs.go", "merged_subscription.go", "metrics.go", + "replication_execution_details.go", "stream_ingest_manager.go", "stream_ingestion_dist.go", "stream_ingestion_frontier_processor.go", @@ -67,6 +68,7 @@ go_library( "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", @@ -92,6 +94,7 @@ go_test( "main_test.go", "merged_subscription_test.go", "rangekey_batcher_test.go", + "replication_execution_details_test.go", "replication_random_client_test.go", "replication_stream_e2e_test.go", "stream_ingestion_dist_test.go", @@ -134,6 +137,7 @@ go_test( "//pkg/security/securitytest", "//pkg/security/username", "//pkg/server", + "//pkg/server/serverpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", @@ -159,6 +163,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/hlc", + "//pkg/util/httputil", "//pkg/util/leaktest", "//pkg/util/limit", "//pkg/util/log", diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index f77b5160c3f4..2e7a841ecaa3 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -100,6 +100,11 @@ func TestDataDriven(t *testing.T) { ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { + // Skip the test if it is a .txt file. This is to allow us to have non-test + // testdata in the same directory as the test files. + if strings.HasSuffix(path, ".txt") { + return + } ds := newDatadrivenTestState() defer ds.cleanup(t) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go new file mode 100644 index 000000000000..d28e3774a7f1 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go @@ -0,0 +1,189 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "bytes" + "context" + "fmt" + "sort" + "text/tabwriter" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/redact" +) + +type frontierExecutionDetails struct { + srcInstanceID base.SQLInstanceID + destInstanceID base.SQLInstanceID + span string + frontierTS string + behindBy redact.SafeString +} + +// constructSpanFrontierExecutionDetails constructs the frontierExecutionDetails +// using the initial partition specs that map spans to the src and dest +// instances, and a snapshot of the current state of the frontier. +// +// The shape of the spans tracked by the frontier can be different from the +// initial partitioned set of spans. To account for this, for each span in the +// initial partition set we want to output all the intersecting sub-spans in the +// frontier along with their timestamps. +func constructSpanFrontierExecutionDetails( + partitionSpecs execinfrapb.StreamIngestionPartitionSpecs, + frontierSpans execinfrapb.FrontierEntries, +) ([]frontierExecutionDetails, error) { + f, err := span.MakeFrontier() + if err != nil { + return nil, err + } + for _, rs := range frontierSpans.ResolvedSpans { + if err := f.AddSpansAt(rs.Timestamp, rs.Span); err != nil { + return nil, err + } + } + + now := timeutil.Now() + res := make([]frontierExecutionDetails, 0) + for _, spec := range partitionSpecs.Specs { + for _, sp := range spec.Spans { + f.SpanEntries(sp, func(r roachpb.Span, timestamp hlc.Timestamp) (done span.OpResult) { + res = append(res, frontierExecutionDetails{ + srcInstanceID: spec.SrcInstanceID, + destInstanceID: spec.DestInstanceID, + span: r.String(), + frontierTS: timestamp.GoTime().String(), + behindBy: humanizeutil.Duration(now.Sub(timestamp.GoTime())), + }) + return span.ContinueMatch + }) + } + + // Sort res on the basis of srcInstanceID, destInstanceID. + sort.Slice(res, func(i, j int) bool { + if res[i].srcInstanceID != res[j].srcInstanceID { + return res[i].srcInstanceID < res[j].srcInstanceID + } + if res[i].destInstanceID != res[j].destInstanceID { + return res[i].destInstanceID < res[j].destInstanceID + } + return res[i].span < res[j].span + }) + } + + return res, nil +} + +// generateSpanFrontierExecutionDetailFile generates and writes a file to the +// job_info table that captures the mapping from: +// +// # Src Instance | Dest Instance | Span | Frontier Timestamp | Behind By +// +// This information is computed from information persisted by the +// stream ingestion resumer and frontier processor. Namely: +// +// - The StreamIngestionPartitionSpec of each partition providing a mapping from +// span to src and dest SQLInstanceID. +// - The snapshot of the frontier tracking how far each span has been replicated +// up to. +func generateSpanFrontierExecutionDetailFile( + ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool, +) error { + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var sb bytes.Buffer + w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.TabIndent) + + // Read the StreamIngestionPartitionSpecs to get a mapping from spans to + // their source and destination SQL instance IDs. + specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID) + if err != nil { + return err + } + + var partitionSpecs execinfrapb.StreamIngestionPartitionSpecs + if err := protoutil.Unmarshal(specs, &partitionSpecs); err != nil { + return err + } + + // Now, read the latest snapshot of the frontier that tells us what + // timestamp each span has been replicated up to. + frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID) + if err != nil { + return err + } + + var frontierSpans execinfrapb.FrontierEntries + if err := protoutil.Unmarshal(frontierEntries, &frontierSpans); err != nil { + return err + } + executionDetails, err := constructSpanFrontierExecutionDetails(partitionSpecs, frontierSpans) + if err != nil { + return err + } + + header := "Src Instance\tDest Instance\tSpan\tFrontier Timestamp\tBehind By" + if skipBehindBy { + header = "Src Instance\tDest Instance\tSpan\tFrontier Timestamp" + } + fmt.Fprintln(w, header) + for _, ed := range executionDetails { + if skipBehindBy { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", + ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS) + } else { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", + ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS, ed.behindBy) + } + } + + filename := fmt.Sprintf("replication-frontier.%s.txt", timeutil.Now().Format("20060102_150405.00")) + if err := w.Flush(); err != nil { + return err + } + return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID) + }) +} + +// persistStreamIngestionPartitionSpecs persists all +// StreamIngestionPartitionSpecs in a serialized form to the job_info table. +// This information is used when the Resumer is requested to construct a +// replication-frontier.txt file. +func persistStreamIngestionPartitionSpecs( + ctx context.Context, + execCfg *sql.ExecutorConfig, + ingestionJobID jobspb.JobID, + streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, +) error { + err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0) + partitionSpecs := execinfrapb.StreamIngestionPartitionSpecs{Specs: specs} + for _, d := range streamIngestionSpecs { + for _, partitionSpec := range d.PartitionSpecs { + partitionSpecs.Specs = append(partitionSpecs.Specs, &partitionSpec) + } + } + specBytes, err := protoutil.Marshal(&partitionSpecs) + if err != nil { + return err + } + return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID) + }) + return err +} diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go new file mode 100644 index 000000000000..4647485a6d32 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go @@ -0,0 +1,384 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "os" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestFrontierExecutionDetailFile is a unit test for +// constructSpanFrontierExecutionDetails. Refer to the method header for +// details. +func TestConstructFrontierExecutionDetailFile(t *testing.T) { + defer leaktest.AfterTest(t)() + + clearTimestamps := func(executionDetails []frontierExecutionDetails) []frontierExecutionDetails { + res := make([]frontierExecutionDetails, len(executionDetails)) + for i, ed := range executionDetails { + ed.frontierTS = "" + ed.behindBy = "" + res[i] = ed + } + return res + } + + for _, tc := range []struct { + name string + partitionSpecs execinfrapb.StreamIngestionPartitionSpecs + frontierEntries execinfrapb.FrontierEntries + expected []frontierExecutionDetails + }{ + { + name: "matching spans", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }}, + }, + { + name: "multi-partition", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + { + SrcInstanceID: 1, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("a'")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "a{-'}", + }, + { + srcInstanceID: 1, + destInstanceID: 3, + span: "{b-c}", + }, + }, + }, + { + name: "merged frontier", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + { + SrcInstanceID: 1, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }, + { + srcInstanceID: 1, + destInstanceID: 3, + span: "{b-d}", + }}, + }, + { + name: "no matching spans", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }, + }, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{}, + }, + { + name: "split frontier", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}}}}}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }, + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{b-c}", + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + executionDetails, err := constructSpanFrontierExecutionDetails(tc.partitionSpecs, tc.frontierEntries) + require.NoError(t, err) + executionDetails = clearTimestamps(executionDetails) + require.Equal(t, tc.expected, executionDetails) + }) + } +} + +func listExecutionDetails( + t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, +) []string { + t.Helper() + + client, err := s.GetAdminHTTPClient() + require.NoError(t, err) + + url := s.AdminURL().String() + fmt.Sprintf("/_status/list_job_profiler_execution_details/%d", jobID) + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + edResp := serverpb.ListJobProfilerExecutionDetailsResponse{} + require.NoError(t, protoutil.Unmarshal(body, &edResp)) + sort.Slice(edResp.Files, func(i, j int) bool { + return edResp.Files[i] < edResp.Files[j] + }) + return edResp.Files +} + +func checkExecutionDetails( + t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string, +) ([]byte, error) { + t.Helper() + + client, err := s.GetAdminHTTPClient() + if err != nil { + return nil, err + } + + url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d?%s", jobID, filename) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + require.Equal(t, http.StatusOK, resp.StatusCode) + + edResp := serverpb.GetJobProfilerExecutionDetailResponse{} + if err := protoutil.Unmarshal(body, &edResp); err != nil { + return nil, err + } + + r := bytes.NewReader(edResp.Data) + data, err := io.ReadAll(r) + if err != nil { + return data, err + } + if len(data) == 0 { + return data, errors.New("no data returned") + } + return data, nil +} + +func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + // First, let's persist some partitions specs. + streamIngestionsSpecs := map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec{ + 1: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 2, + DestInstanceID: 1, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }, + }, + 2: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + }, + }, + }, + }, + 3: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 3, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, + }, + }, + }, + }, + } + + srv := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + ts := srv.ApplicationLayer() + execCfg := ts.ExecutorConfig().(sql.ExecutorConfig) + + ingestionJobID := jobspb.JobID(123) + require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg, + ingestionJobID, streamIngestionsSpecs)) + + // Now, let's persist some frontier entries. + frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("d'")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("d'"), EndKey: roachpb.Key("e")}, + Timestamp: hlc.Timestamp{WallTime: 0}, + }, + }} + + frontierBytes, err := protoutil.Marshal(&frontierEntries) + require.NoError(t, err) + require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID) + })) + require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */)) + files := listExecutionDetails(t, srv, ingestionJobID) + require.Len(t, files, 1) + data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0]) + require.NoError(t, err) + require.NotEmpty(t, data) + + expectedData, err := os.ReadFile(datapathutils.TestDataPath(t, "expected_replication_frontier.txt")) + require.NoError(t, err) + require.Equal(t, expectedData, data) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index c5f0be2b64d8..5c79614db213 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -55,6 +55,10 @@ var replanFrequency = settings.RegisterDurationSetting( settings.PositiveDuration, ) +// replicationPartitionInfoFilename is the filename at which the replication job +// resumer writes its partition specs. +const replicationPartitionInfoFilename = "~replication-partition-specs.binpb" + func startDistIngestion( ctx context.Context, execCtx sql.JobExecContext, resumer *streamIngestionResumer, ) error { @@ -172,7 +176,6 @@ func startDistIngestion( } return ingestor.ingestSpanConfigs(ctx, details.SourceTenantName) } - execInitialPlan := func(ctx context.Context) error { defer func() { stopReplanner() @@ -316,6 +319,9 @@ func (p *replicationFlowPlanner) constructPlanGenerator( if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil { knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec) } + if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil { + return nil, nil, err + } // Setup a one-stage plan with one proc per input spec. corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs)) @@ -565,6 +571,8 @@ func constructStreamIngestionPlanSpecs( SubscriptionToken: string(partition.SubscriptionToken), Address: string(partition.SrcAddr), Spans: partition.Spans, + SrcInstanceID: base.SQLInstanceID(partition.SrcInstanceID), + DestInstanceID: destID, } streamIngestionSpecs[destID].PartitionSpecs[partition.ID] = partSpec trackedSpans = append(trackedSpans, partition.Spans...) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 1b46e1f3d85a..d532a9ec4d36 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -46,8 +46,25 @@ var JobCheckpointFrequency = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) +// DumpFrontierEntries controls the frequency at which we persist the entries in +// the frontier to the `system.job_info` table. +// +// TODO(adityamaru): This timer should be removed once each job is aware of whether +// it is profiling or not. +var DumpFrontierEntries = settings.RegisterDurationSetting( + settings.TenantWritable, + "stream_replication.dump_frontier_entries_frequency", + "controls the frequency with which the frontier entries are persisted; if 0, disabled", + 10*time.Minute, + settings.NonNegativeDuration, +) + const streamIngestionFrontierProcName = `ingestfntr` +// frontierEntriesFilename is the name of the file at which the stream ingestion +// frontier periodically dumps its state. +const frontierEntriesFilename = "~replication-frontier-entries.binpb" + type streamIngestionFrontier struct { execinfra.ProcessorBase @@ -78,6 +95,7 @@ type streamIngestionFrontier struct { persistedReplicatedTime hlc.Timestamp lastPartitionUpdate time.Time + lastFrontierDump time.Time partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress } @@ -305,6 +323,11 @@ func (sf *streamIngestionFrontier) Next() ( break } + if err := sf.maybePersistFrontierEntries(); err != nil { + log.Errorf(sf.Ctx(), "failed to persist frontier entries: %+v", err) + } + + // Send back a row to the job so that it can update the progress. select { case <-sf.Ctx().Done(): sf.MoveToDraining(sf.Ctx().Err()) @@ -489,3 +512,38 @@ func (sf *streamIngestionFrontier) updateLagMetric() { sf.metrics.FrontierLagNanos.Update(timeutil.Since(sf.persistedReplicatedTime.GoTime()).Nanoseconds()) } } + +// maybePersistFrontierEntries periodically persists the current state of the +// frontier to the `system.job_info` table. This information is used to hydrate +// the execution details that can be requested for the C2C ingestion job. Note, +// we always persist the entries to the same info key and so we never have more +// than one row describing the state of the frontier at a given point in time. +func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { + dumpFreq := DumpFrontierEntries.Get(&sf.FlowCtx.Cfg.Settings.SV) + if dumpFreq == 0 || timeutil.Since(sf.lastFrontierDump) < dumpFreq { + return nil + } + ctx := sf.Ctx() + f := sf.frontier + jobID := jobspb.JobID(sf.spec.JobID) + + frontierEntries := &execinfrapb.FrontierEntries{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} + f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { + frontierEntries.ResolvedSpans = append(frontierEntries.ResolvedSpans, jobspb.ResolvedSpan{Span: sp, Timestamp: ts}) + return span.ContinueMatch + }) + + frontierBytes, err := protoutil.Marshal(frontierEntries) + if err != nil { + return err + } + + if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID) + }); err != nil { + return err + } + + sf.lastFrontierDump = timeutil.Now() + return nil +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 725155267292..51f8a7c1f1c0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -536,8 +536,18 @@ func (s *streamIngestionResumer) CollectProfile(ctx context.Context, execCtx int defer s.mu.Unlock() aggStatsCopy = s.mu.perNodeAggregatorStats.DeepCopy() }() - return bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + + var combinedErr error + if err := bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), + p.ExecCfg().InternalDB, aggStatsCopy); err != nil { + combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to flush aggregator stats")) + } + if err := generateSpanFrontierExecutionDetailFile(ctx, p.ExecCfg(), + s.job.ID(), false /* skipBehindBy */); err != nil { + combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to generate span frontier execution details")) + } + + return combinedErr } func closeAndLog(ctx context.Context, d streamclient.Dialer) { diff --git a/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt b/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt new file mode 100644 index 000000000000..ce2cb4529165 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt @@ -0,0 +1,5 @@ +Src Instance Dest Instance Span Frontier Timestamp +1 2 {b-c} 1970-01-01 00:00:00.000000002 +0000 UTC +2 1 {a-b} 1970-01-01 00:00:00.000000001 +0000 UTC +3 3 d{-'} 1970-01-01 00:00:00.000000002 +0000 UTC +3 3 {d'-e} 1970-01-01 00:00:00 +0000 UTC diff --git a/pkg/cmd/dev/BUILD.bazel b/pkg/cmd/dev/BUILD.bazel index f70f31d0b2c3..34683ffcdc34 100644 --- a/pkg/cmd/dev/BUILD.bazel +++ b/pkg/cmd/dev/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "go.go", "lint.go", "main.go", - "merge_test_xmls.go", "roachprod_stress.go", "test.go", "test_binaries.go", diff --git a/pkg/cmd/dev/dev.go b/pkg/cmd/dev/dev.go index d443124dc587..2fcc8c5ceebf 100644 --- a/pkg/cmd/dev/dev.go +++ b/pkg/cmd/dev/dev.go @@ -128,7 +128,6 @@ Typical usage: makeDoctorCmd(ret.doctor), makeGenerateCmd(ret.generate), makeGoCmd(ret.gocmd), - makeMergeTestXMLsCmd(ret.mergeTestXMLs), makeTestLogicCmd(ret.testlogic), makeLintCmd(ret.lint), makeTestCmd(ret.test), diff --git a/pkg/cmd/dev/merge_test_xmls.go b/pkg/cmd/dev/merge_test_xmls.go deleted file mode 100644 index 79ecedc28b10..000000000000 --- a/pkg/cmd/dev/merge_test_xmls.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package main - -import ( - "encoding/xml" - "os" - - bazelutil "github.com/cockroachdb/cockroach/pkg/build/util" - "github.com/spf13/cobra" -) - -func makeMergeTestXMLsCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Command { - mergeTestXMLsCommand := &cobra.Command{ - Use: "merge-test-xmls XML1 [XML2...]", - Short: "Merge the given test XML's (utility command)", - Long: "Merge the given test XML's (utility command)", - Args: cobra.MinimumNArgs(1), - RunE: runE, - } - mergeTestXMLsCommand.Hidden = true - return mergeTestXMLsCommand -} - -func (d *dev) mergeTestXMLs(cmd *cobra.Command, xmls []string) error { - var suites []bazelutil.TestSuites - for _, file := range xmls { - suitesToAdd := bazelutil.TestSuites{} - input, err := os.ReadFile(file) - if err != nil { - return err - } - err = xml.Unmarshal(input, &suitesToAdd) - if err != nil { - return err - } - suites = append(suites, suitesToAdd) - } - return bazelutil.MergeTestXMLs(suites, os.Stdout) -} diff --git a/pkg/cmd/roachtest/tests/versionupgrade.go b/pkg/cmd/roachtest/tests/versionupgrade.go index 74d947f15db5..edd0d94c51b3 100644 --- a/pkg/cmd/roachtest/tests/versionupgrade.go +++ b/pkg/cmd/roachtest/tests/versionupgrade.go @@ -304,7 +304,7 @@ func uploadAndStart(nodes option.NodeListOption, v string) versionStep { func binaryUpgradeStep(nodes option.NodeListOption, newVersion string) versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { if err := clusterupgrade.RestartNodesWithNewBinary( - ctx, t, t.L(), u.c, nodes, option.DefaultStartOpts(), newVersion, + ctx, t, t.L(), u.c, nodes, option.DefaultStartOptsNoBackups(), newVersion, ); err != nil { t.Fatal(err) } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index a9bc42d7acee..a65b996f0b69 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -489,8 +489,8 @@ message RestoreDetails { // online restore. repeated roachpb.Span download_spans = 32 [(gogoproto.nullable) = false]; - // Strips localities - bool StripLocalities = 33; + // Removes regions. + bool RemoveRegions = 33; // NEXT ID: 34. } diff --git a/pkg/kv/kvserver/gc/BUILD.bazel b/pkg/kv/kvserver/gc/BUILD.bazel index 5492da85bf77..ef1af7326751 100644 --- a/pkg/kv/kvserver/gc/BUILD.bazel +++ b/pkg/kv/kvserver/gc/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/kv/kvpb", "//pkg/kv/kvserver/abortspan", "//pkg/kv/kvserver/benignerror", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/rditer", "//pkg/roachpb", "//pkg/settings", @@ -24,7 +25,6 @@ go_library( "//pkg/util/bufalloc", "//pkg/util/hlc", "//pkg/util/log", - "//pkg/util/protoutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/gc/gc.go b/pkg/kv/kvserver/gc/gc.go index 486490ec4cda..1847c0e5e5d2 100644 --- a/pkg/kv/kvserver/gc/gc.go +++ b/pkg/kv/kvserver/gc/gc.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -38,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -350,9 +350,9 @@ func Run( Threshold: newThreshold, } - fastPath, err := processReplicatedKeyRange(ctx, desc, snap, now, newThreshold, options.IntentAgeThreshold, - populateBatcherOptions(options), - gcer, + // Process all replicated locks first and resolve any that have been around + // for longer than the IntentAgeThreshold. + err := processReplicatedLocks(ctx, desc, snap, now, options.IntentAgeThreshold, intentBatcherOptions{ maxIntentsPerIntentCleanupBatch: options.MaxIntentsPerIntentCleanupBatch, maxIntentKeyBytesPerIntentCleanupBatch: options.MaxIntentKeyBytesPerIntentCleanupBatch, @@ -365,6 +365,14 @@ func Run( } return Info{}, err } + fastPath, err := processReplicatedKeyRange(ctx, desc, snap, newThreshold, + populateBatcherOptions(options), gcer, &info) + if err != nil { + if errors.Is(err, pebble.ErrSnapshotExcised) { + err = benignerror.NewStoreBenign(err) + } + return Info{}, err + } err = processReplicatedRangeTombstones(ctx, desc, snap, fastPath, now, newThreshold, gcer, &info) if err != nil { if errors.Is(err, pebble.ErrSnapshotExcised) { @@ -420,19 +428,15 @@ func populateBatcherOptions(options RunOptions) gcKeyBatcherThresholds { // remove it. // // The logic iterates all versions of all keys in the range from oldest to -// newest. Expired intents are written into the txnMap and intentKeyMap. -// Returns true if clear range was used to remove all user data. +// newest. Intents are not handled by this function; they're simply skipped +// over. Returns true if clear range was used to remove user data. func processReplicatedKeyRange( ctx context.Context, desc *roachpb.RangeDescriptor, snap storage.Reader, - now hlc.Timestamp, threshold hlc.Timestamp, - intentAgeThreshold time.Duration, batcherThresholds gcKeyBatcherThresholds, gcer PureGCer, - options intentBatcherOptions, - cleanupIntentsFn CleanupIntentsFunc, info *Info, ) (bool, error) { // Perform fast path check prior to performing GC. Fast path only collects @@ -458,40 +462,11 @@ func processReplicatedKeyRange( } } - // Compute intent expiration (intent age at which we attempt to resolve). - intentExp := now.Add(-intentAgeThreshold.Nanoseconds(), 0) - return excludeUserKeySpan, rditer.IterateMVCCReplicaKeySpans(desc, snap, rditer.IterateOptions{ CombineRangesAndPoints: true, Reverse: true, ExcludeUserKeySpan: excludeUserKeySpan, }, func(iterator storage.MVCCIterator, span roachpb.Span, keyType storage.IterKeyType) error { - intentBatcher := newIntentBatcher(cleanupIntentsFn, options, info) - - // handleIntent will deserialize transaction info and if intent is older than - // threshold enqueue it to batcher, otherwise ignore it. - handleIntent := func(keyValue *mvccKeyValue) error { - meta := &enginepb.MVCCMetadata{} - if err := protoutil.Unmarshal(keyValue.metaValue, meta); err != nil { - log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keyValue.key, err) - return nil - } - if meta.Txn != nil { - // Keep track of intent to resolve if older than the intent - // expiration threshold. - if meta.Timestamp.ToTimestamp().Less(intentExp) { - info.IntentsConsidered++ - if err := intentBatcher.addAndMaybeFlushIntents(ctx, keyValue.key.Key, meta); err != nil { - if errors.Is(err, ctx.Err()) { - return err - } - log.Warningf(ctx, "failed to cleanup intents batch: %v", err) - } - } - } - return nil - } - // Iterate all versions of all keys from oldest to newest. If a version is an // intent it will have the highest timestamp of any versions and will be // followed by a metadata entry. @@ -527,18 +502,17 @@ func processReplicatedKeyRange( switch { case s.curIsNotValue(): - // Skip over non mvcc data and intents. + // Skip over non mvcc data. err = b.foundNonGCableData(ctx, s.cur, true /* isNewestPoint */) case s.curIsIntent(): + // Skip over intents; they cannot be GC-ed. We simply ignore them -- + // processReplicatedLocks will resolve them, if necessary. err = b.foundNonGCableData(ctx, s.cur, true /* isNewestPoint */) if err != nil { return err } - if err = handleIntent(s.next); err != nil { - return err - } - // For intents, we force step over the intent metadata after provisional - // value is found. + // Force step over the intent metadata as well to move on to the next + // key. it.step() default: if isGarbage(threshold, s.cur, s.next, s.curIsNewest(), s.firstRangeTombstoneTsAtOrBelowGC) { @@ -552,20 +526,98 @@ func processReplicatedKeyRange( } } - err := b.flushLastBatch(ctx) + return b.flushLastBatch(ctx) + }) +} + +// processReplicatedLocks identifies extant replicated locks which have been +// around longer than the supplied intentAgeThreshold and resolves them. +func processReplicatedLocks( + ctx context.Context, + desc *roachpb.RangeDescriptor, + reader storage.Reader, + now hlc.Timestamp, + // TODO(arul): rename to lockAgeThreshold + intentAgeThreshold time.Duration, + options intentBatcherOptions, + cleanupIntentsFn CleanupIntentsFunc, + info *Info, +) error { + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-intentAgeThreshold.Nanoseconds(), 0) + intentBatcher := newIntentBatcher(cleanupIntentsFn, options, info) + + process := func(ltStartKey, ltEndKey roachpb.Key) error { + opts := storage.LockTableIteratorOptions{ + LowerBound: ltStartKey, + UpperBound: ltEndKey, + MatchMinStr: lock.Intent, + } + iter, err := storage.NewLockTableIterator(reader, opts) if err != nil { return err } + defer iter.Close() - // We need to send out last intent cleanup batch. - if err := intentBatcher.maybeFlushPendingIntents(ctx); err != nil { - if errors.Is(err, ctx.Err()) { + var ok bool + for ok, err = iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStartKey}); ok; ok, err = iter.NextEngineKey() { + if err != nil { + return err + } + var meta enginepb.MVCCMetadata + err = iter.ValueProto(&meta) + if err != nil { return err } - log.Warningf(ctx, "failed to cleanup intents batch: %v", err) + if meta.Txn == nil { + return errors.AssertionFailedf("intent without transaction") + } + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if meta.Timestamp.ToTimestamp().Less(intentExp) { + info.IntentsConsidered++ + key, err := iter.EngineKey() + if err != nil { + return err + } + ltKey, err := key.ToLockTableKey() + if err != nil { + return err + } + if ltKey.Strength != lock.Intent { + return errors.AssertionFailedf("unexpected strength for LockTableKey %s", ltKey.Strength) + } + if err := intentBatcher.addAndMaybeFlushIntents(ctx, ltKey.Key, &meta); err != nil { + if errors.Is(err, ctx.Err()) { + return err + } + log.Warningf(ctx, "failed to cleanup intents batch: %v", err) + } + } } return nil + } + + // We want to find/resolve replicated locks over both local and global + // keys. That's what the call to Select below will give us. + ltSpans := rditer.Select(desc.RangeID, rditer.SelectOpts{ + ReplicatedBySpan: desc.RSpan(), + ReplicatedSpansFilter: rditer.ReplicatedSpansLocksOnly, }) + for _, sp := range ltSpans { + if err := process(sp.Key, sp.EndKey); err != nil { + return err + } + } + + // We need to send out last intent cleanup batch, if present. + if err := intentBatcher.maybeFlushPendingIntents(ctx); err != nil { + if errors.Is(err, ctx.Err()) { + return err + } + log.Warningf(ctx, "failed to cleanup intents batch: %v", err) + } + return nil } // gcBatchCounters contain statistics about garbage that is collected for the diff --git a/pkg/kv/kvserver/gc/gc_test.go b/pkg/kv/kvserver/gc/gc_test.go index ad53ec92b8b1..02c9f9ccbff4 100644 --- a/pkg/kv/kvserver/gc/gc_test.go +++ b/pkg/kv/kvserver/gc/gc_test.go @@ -112,10 +112,11 @@ func TestBatchingInlineGCer(t *testing.T) { require.Zero(t, m.size) } -// TestIntentAgeThresholdSetting verifies that the GC intent resolution threshold can be -// adjusted. It uses short and long threshold to verify that intents inserted between two -// thresholds are not considered for resolution when threshold is high (1st attempt) and -// considered when threshold is low (2nd attempt). +// TestIntentAgeThresholdSetting verifies that the GC intent resolution +// threshold can be adjusted. It uses short and long threshold to verify that +// intents inserted between two thresholds are not considered for resolution +// when threshold is high (1st attempt) and considered when threshold is low +// (2nd attempt). func TestIntentAgeThresholdSetting(t *testing.T) { defer leaktest.AfterTest(t)() @@ -131,12 +132,15 @@ func TestIntentAgeThresholdSetting(t *testing.T) { // Prepare test intents in MVCC. key := []byte("a") + localKey := keys.MakeRangeKeyPrefix(key) value := roachpb.Value{RawBytes: []byte("0123456789")} intentHlc := hlc.Timestamp{ WallTime: intentTs.Nanoseconds(), } txn := roachpb.MakeTransaction("txn", key, isolation.Serializable, roachpb.NormalUserPriority, intentHlc, 1000, 0, 0) + // Write two intents -- one for a global key, and another for a local key. require.NoError(t, storage.MVCCPut(ctx, eng, key, intentHlc, value, storage.MVCCWriteOptions{Txn: &txn})) + require.NoError(t, storage.MVCCPut(ctx, eng, localKey, intentHlc, value, storage.MVCCWriteOptions{Txn: &txn})) require.NoError(t, eng.Flush()) // Prepare test fixtures for GC run. @@ -162,6 +166,7 @@ func TestIntentAgeThresholdSetting(t *testing.T) { require.NoError(t, err, "GC Run shouldn't fail") assert.Zero(t, info.IntentsConsidered, "Expected no intents considered by GC with default threshold") + require.Zero(t, len(gcer.intents)) info, err = Run(ctx, &desc, snap, nowTs, nowTs, RunOptions{ @@ -170,8 +175,9 @@ func TestIntentAgeThresholdSetting(t *testing.T) { }, gcTTL, &gcer, gcer.resolveIntents, gcer.resolveIntentsAsync) require.NoError(t, err, "GC Run shouldn't fail") - assert.Equal(t, 1, info.IntentsConsidered, - "Expected 1 intents considered by GC with short threshold") + assert.Equal(t, 2, info.IntentsConsidered, + "Expected 2 intents considered by GC with short threshold") + require.Equal(t, 2, len(gcer.intents)) } func TestIntentCleanupBatching(t *testing.T) { diff --git a/pkg/kv/kvserver/rditer/select.go b/pkg/kv/kvserver/rditer/select.go index 3d1fb96873ff..f255307cf799 100644 --- a/pkg/kv/kvserver/rditer/select.go +++ b/pkg/kv/kvserver/rditer/select.go @@ -15,20 +15,30 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" ) +// ReplicatedSpansFilter is used to declare filters when selecting replicated +// spans. +// +// TODO(arul): we could consider using a bitset here instead. Note that a lot of +// the fields here are mutually exclusive (e.g. ReplicatedSpansExcludeLocks and +// ReplicatedSpansLocksOnly), so we'd need some sort of validation here. type ReplicatedSpansFilter int const ( // ReplicatedSpansAll includes all replicated spans, including user keys, // range descriptors, and lock keys. ReplicatedSpansAll ReplicatedSpansFilter = iota - // ReplicatedSpansExcludeUser includes all replicated spans except for user keys. + // ReplicatedSpansExcludeUser includes all replicated spans except for user + // keys. ReplicatedSpansExcludeUser // ReplicatedSpansUserOnly includes just user keys, and no other replicated // spans. ReplicatedSpansUserOnly - // ReplicatedSpansExcludeLockTable includes all replicated spans except for the + // ReplicatedSpansExcludeLocks includes all replicated spans except for the // lock table. ReplicatedSpansExcludeLocks + // ReplicatedSpansLocksOnly includes just spans for the lock table, and no + // other replicated spans. + ReplicatedSpansLocksOnly ) // SelectOpts configures which spans for a Replica to return from Select. @@ -70,18 +80,20 @@ func Select(rangeID roachpb.RangeID, opts SelectOpts) []roachpb.Span { } if !opts.ReplicatedBySpan.Equal(roachpb.RSpan{}) { - // r1 "really" only starts at LocalMax. But because we use a StartKey of RKeyMin - // for r1, we actually do anchor range descriptors (and their locks and txn records) - // at RKeyMin as well. On the other hand, the "user key space" cannot start at RKeyMin - // because then it encompasses the special-cased prefix \x02... (/Local/). - // So awkwardly for key-based local keyspace we must not call KeySpan, for - // user keys we have to. + // r1 "really" only starts at LocalMax. But because we use a StartKey of + // RKeyMin for r1, we actually do anchor range descriptors (and their locks + // and txn records) at RKeyMin as well. On the other hand, the "user key + // space" cannot start at RKeyMin because then it encompasses the + // special-cased prefix \x02... (/Local/). So awkwardly for key-based local + // keyspace we must not call KeySpan, for user keys we have to. // // See also the comment on KeySpan. in := opts.ReplicatedBySpan adjustedIn := in.KeySpan() if opts.ReplicatedSpansFilter != ReplicatedSpansUserOnly { - sl = append(sl, makeRangeLocalKeySpan(in)) + if opts.ReplicatedSpansFilter != ReplicatedSpansLocksOnly { + sl = append(sl, makeRangeLocalKeySpan(in)) + } // Lock table. if opts.ReplicatedSpansFilter != ReplicatedSpansExcludeLocks { @@ -89,7 +101,8 @@ func Select(rangeID roachpb.RangeID, opts SelectOpts) []roachpb.Span { // is a range local key that can have a replicated lock acquired on it. startRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.Key), nil) endRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.EndKey), nil) - // Need adjusted start key to avoid overlapping with the local lock span right above. + // Need adjusted start key to avoid overlapping with the local lock span + // right above. startGlobal, _ := keys.LockTableSingleKey(adjustedIn.Key.AsRawKey(), nil) endGlobal, _ := keys.LockTableSingleKey(adjustedIn.EndKey.AsRawKey(), nil) sl = append(sl, roachpb.Span{ @@ -101,8 +114,10 @@ func Select(rangeID roachpb.RangeID, opts SelectOpts) []roachpb.Span { }) } } - if opts.ReplicatedSpansFilter != ReplicatedSpansExcludeUser { - // Adjusted span because r1's "normal" keyspace starts only at LocalMax, not RKeyMin. + if opts.ReplicatedSpansFilter != ReplicatedSpansExcludeUser && + opts.ReplicatedSpansFilter != ReplicatedSpansLocksOnly { + // Adjusted span because r1's "normal" keyspace starts only at LocalMax, + // not RKeyMin. sl = append(sl, adjustedIn.AsRawSpanWithNoLocals()) } } diff --git a/pkg/kv/kvserver/rditer/select_test.go b/pkg/kv/kvserver/rditer/select_test.go index c9cda164b7f6..080c543aea2b 100644 --- a/pkg/kv/kvserver/rditer/select_test.go +++ b/pkg/kv/kvserver/rditer/select_test.go @@ -76,6 +76,14 @@ func TestSelect(t *testing.T) { }, filter: ReplicatedSpansExcludeLocks, }, + { + name: "r2_locksonly", + sp: roachpb.RSpan{ + Key: roachpb.RKey("a"), + EndKey: roachpb.RKey("c"), + }, + filter: ReplicatedSpansLocksOnly, + }, { name: "r3", sp: roachpb.RSpan{ diff --git a/pkg/kv/kvserver/rditer/testdata/TestSelect/r2_locksonly b/pkg/kv/kvserver/rditer/testdata/TestSelect/r2_locksonly new file mode 100644 index 000000000000..44ed12b9daf0 --- /dev/null +++ b/pkg/kv/kvserver/rditer/testdata/TestSelect/r2_locksonly @@ -0,0 +1,18 @@ +echo +---- +Select({ReplicatedBySpan:{a-c} ReplicatedSpansFilter:4 ReplicatedByRangeID:false UnreplicatedByRangeID:false}): + /Local/Lock/Local/Range"{a"-c"} + /Local/Lock"{a"-c"} +Select({ReplicatedBySpan:{a-c} ReplicatedSpansFilter:4 ReplicatedByRangeID:false UnreplicatedByRangeID:true}): + /Local/RangeID/123/{u""-v""} + /Local/Lock/Local/Range"{a"-c"} + /Local/Lock"{a"-c"} +Select({ReplicatedBySpan:{a-c} ReplicatedSpansFilter:4 ReplicatedByRangeID:true UnreplicatedByRangeID:false}): + /Local/RangeID/123/{r""-s""} + /Local/Lock/Local/Range"{a"-c"} + /Local/Lock"{a"-c"} +Select({ReplicatedBySpan:{a-c} ReplicatedSpansFilter:4 ReplicatedByRangeID:true UnreplicatedByRangeID:true}): + /Local/RangeID/123/{r""-s""} + /Local/RangeID/123/{u""-v""} + /Local/Lock/Local/Range"{a"-c"} + /Local/Lock"{a"-c"} diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 2850c2e5ac8d..159f24e0d727 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -190,9 +190,33 @@ message IngestStoppedSpec { // to connect to it. message StreamIngestionPartitionSpec { optional string partition_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "PartitionID"]; + optional string subscription_token = 2 [(gogoproto.nullable) = false]; + optional string address = 3 [(gogoproto.nullable) = false]; + repeated roachpb.Span spans = 4 [(gogoproto.nullable) = false]; + + optional int32 src_instance_id = 5 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SrcInstanceID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; + + optional int32 dest_instance_id = 6 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "DestInstanceID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"]; +} + +// StreamIngestionPartitionSpecs contains all the partition specs that are part +// of the DistSQL plan. +message StreamIngestionPartitionSpecs { + repeated StreamIngestionPartitionSpec specs = 3; +} + +// FrontierEntries is a snapshot of a span frontier. +message FrontierEntries { + repeated jobs.jobspb.ResolvedSpan resolved_spans = 1 [(gogoproto.nullable) = false]; } message StreamIngestionDataSpec { diff --git a/pkg/sql/logictest/testdata/logic_test/scrub b/pkg/sql/logictest/testdata/logic_test/scrub index 7d43dbead5ca..1569af811484 100644 --- a/pkg/sql/logictest/testdata/logic_test/scrub +++ b/pkg/sql/logictest/testdata/logic_test/scrub @@ -4,7 +4,6 @@ statement error pgcode 42P01 relation "t" does not exist EXPERIMENTAL SCRUB TABLE t ------ # TODO(mjibson): remove FAMILY definition after #41002 is fixed. statement ok @@ -24,29 +23,28 @@ INSERT INTO t VALUES (1, 'hello'), (2, 'help'), (0, 'heeee') query TTTTTTTT EXPERIMENTAL SCRUB TABLE t ------ +---- statement error not implemented EXPERIMENTAL SCRUB TABLE t WITH OPTIONS PHYSICAL query TTTTTTTT EXPERIMENTAL SCRUB TABLE t WITH OPTIONS INDEX ALL ------- +---- statement error not implemented EXPERIMENTAL SCRUB TABLE t WITH OPTIONS PHYSICAL, INDEX (name_idx) ------ statement error specified indexes to check that do not exist on table "t": not_an_index, also_not EXPERIMENTAL SCRUB TABLE t WITH OPTIONS INDEX (not_an_index, also_not, name_idx) query TTTTTTTT EXPERIMENTAL SCRUB TABLE t WITH OPTIONS CONSTRAINT ALL ------ +---- query TTTTTTTT EXPERIMENTAL SCRUB TABLE t WITH OPTIONS CONSTRAINT (abc) ------ +---- statement error pq: constraint "xyz" of relation "t" does not exist EXPERIMENTAL SCRUB TABLE t WITH OPTIONS CONSTRAINT (xyz) @@ -63,7 +61,7 @@ EXPERIMENTAL SCRUB TABLE v1 query TTTTTTTT EXPERIMENTAL SCRUB DATABASE test ------ +---- # make sure there are no errors when values in the index are NULL @@ -80,7 +78,7 @@ INSERT INTO test.xyz (x, y) VALUES (8, 2), (9, 2); query TTTTTTBT EXPERIMENTAL SCRUB TABLE xyz WITH OPTIONS INDEX ALL ------ +---- # Test that scrub checks work when a table has an implicit rowid primary key. @@ -110,7 +108,7 @@ INSERT INTO test.fk_parent VALUES (1,1), (2,1), (3,NULL) query TTTTTTTT EXPERIMENTAL SCRUB TABLE test.fk_parent WITH OPTIONS CONSTRAINT (fkey) ------ +---- # Test AS OF SYSTEM TIME diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 0f4a79f2af11..5140c99c5919 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -987,7 +987,7 @@ func (u *sqlSymUnion) beginTransaction() *tree.BeginTransaction { %token RANGE RANGES READ REAL REASON REASSIGN RECURSIVE RECURRING REDACT REF REFERENCES REFRESH %token REGCLASS REGION REGIONAL REGIONS REGNAMESPACE REGPROC REGPROCEDURE REGROLE REGTYPE REINDEX -%token RELATIVE RELOCATE REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION +%token RELATIVE RELOCATE REMOVE_PATH REMOVE_REGIONS RENAME REPEATABLE REPLACE REPLICATION %token RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETENTION RETURNING RETURN RETURNS RETRY REVISION_HISTORY %token REVOKE RIGHT ROLE ROLES ROLLBACK ROLLUP ROUTINES ROW ROWS RSHIFT RULE RUNNING @@ -995,7 +995,7 @@ func (u *sqlSymUnion) beginTransaction() *tree.BeginTransaction { %token SEARCH SECOND SECONDARY SECURITY SELECT SEQUENCE SEQUENCES %token SERIALIZABLE SERVER SERVICE SESSION SESSIONS SESSION_USER SET SETOF SETS SETTING SETTINGS %token SHARE SHARED SHOW SIMILAR SIMPLE SIZE SKIP SKIP_LOCALITIES_CHECK SKIP_MISSING_FOREIGN_KEYS -%token SKIP_MISSING_SEQUENCES SKIP_MISSING_SEQUENCE_OWNERS SKIP_MISSING_VIEWS SKIP_MISSING_UDFS STRIP_LOCALITIES SMALLINT SMALLSERIAL +%token SKIP_MISSING_SEQUENCES SKIP_MISSING_SEQUENCE_OWNERS SKIP_MISSING_VIEWS SKIP_MISSING_UDFS SMALLINT SMALLSERIAL %token SNAPSHOT SOME SPLIT SQL SQLLOGIN %token STABLE START STATE STATISTICS STATUS STDIN STDOUT STOP STREAM STRICT STRING STORAGE STORE STORED STORING SUBSTRING SUPER %token SUPPORT SURVIVE SURVIVAL SYMMETRIC SYNTAX SYSTEM SQRT SUBSCRIPTION STATEMENTS @@ -3845,9 +3845,9 @@ restore_options: { $$.val = &tree.RestoreOptions{ExperimentalOnline: true} } -| STRIP_LOCALITIES +| REMOVE_REGIONS { - $$.val = &tree.RestoreOptions{StripLocalities: true, SkipLocalitiesCheck: true} + $$.val = &tree.RestoreOptions{RemoveRegions: true, SkipLocalitiesCheck: true} } virtual_cluster_opt: @@ -16891,6 +16891,7 @@ unreserved_keyword: | RELATIVE | RELEASE | RELOCATE +| REMOVE_REGIONS | RENAME | REPEATABLE | REPLACE @@ -16953,7 +16954,6 @@ unreserved_keyword: | SKIP_MISSING_SEQUENCE_OWNERS | SKIP_MISSING_VIEWS | SKIP_MISSING_UDFS -| STRIP_LOCALITIES | SNAPSHOT | SPLIT | SQL @@ -17446,6 +17446,7 @@ bare_label_keywords: | RELATIVE | RELEASE | RELOCATE +| REMOVE_REGIONS | RENAME | REPEATABLE | REPLACE @@ -17512,7 +17513,6 @@ bare_label_keywords: | SKIP_MISSING_SEQUENCE_OWNERS | SKIP_MISSING_UDFS | SKIP_MISSING_VIEWS -| STRIP_LOCALITIES | SMALLINT | SNAPSHOT | SOME diff --git a/pkg/sql/parser/testdata/backup_restore b/pkg/sql/parser/testdata/backup_restore index a397c0b113c9..26cd6b65f7b3 100644 --- a/pkg/sql/parser/testdata/backup_restore +++ b/pkg/sql/parser/testdata/backup_restore @@ -824,12 +824,12 @@ RESTORE TABLE foo FROM '_' WITH OPTIONS (skip_missing_foreign_keys, skip_missing RESTORE TABLE _ FROM 'bar' WITH OPTIONS (skip_missing_foreign_keys, skip_missing_sequences, detached) -- identifiers removed parse -RESTORE TABLE foo FROM 'bar' WITH strip_localities +RESTORE TABLE foo FROM 'bar' WITH remove_regions ---- -RESTORE TABLE foo FROM 'bar' WITH OPTIONS (skip_localities_check, strip_localities) -- normalized! -RESTORE TABLE (foo) FROM ('bar') WITH OPTIONS (skip_localities_check, strip_localities) -- fully parenthesized -RESTORE TABLE foo FROM '_' WITH OPTIONS (skip_localities_check, strip_localities) -- literals removed -RESTORE TABLE _ FROM 'bar' WITH OPTIONS (skip_localities_check, strip_localities) -- identifiers removed +RESTORE TABLE foo FROM 'bar' WITH OPTIONS (skip_localities_check, remove_regions) -- normalized! +RESTORE TABLE (foo) FROM ('bar') WITH OPTIONS (skip_localities_check, remove_regions) -- fully parenthesized +RESTORE TABLE foo FROM '_' WITH OPTIONS (skip_localities_check, remove_regions) -- literals removed +RESTORE TABLE _ FROM 'bar' WITH OPTIONS (skip_localities_check, remove_regions) -- identifiers removed parse BACKUP INTO 'bar' WITH include_all_virtual_clusters = $1, detached diff --git a/pkg/sql/sem/tree/backup.go b/pkg/sql/sem/tree/backup.go index e963e09f062c..6b1441d2eaaa 100644 --- a/pkg/sql/sem/tree/backup.go +++ b/pkg/sql/sem/tree/backup.go @@ -149,7 +149,7 @@ type RestoreOptions struct { UnsafeRestoreIncompatibleVersion bool ExecutionLocality Expr ExperimentalOnline bool - StripLocalities bool + RemoveRegions bool } var _ NodeFormatter = &RestoreOptions{} @@ -536,9 +536,9 @@ func (o *RestoreOptions) Format(ctx *FmtCtx) { ctx.WriteString("experimental deferred copy") } - if o.StripLocalities { + if o.RemoveRegions { maybeAddSep() - ctx.WriteString("strip_localities") + ctx.WriteString("remove_regions") } } @@ -612,8 +612,8 @@ func (o *RestoreOptions) CombineWith(other *RestoreOptions) error { } if o.SkipLocalitiesCheck { - // If StripLocalities is true, SkipLocalitiesCheck should also be true - if other.SkipLocalitiesCheck && !other.StripLocalities { + // If RemoveRegions is true, SkipLocalitiesCheck should also be true + if other.SkipLocalitiesCheck && !other.RemoveRegions { return errors.New("skip_localities_check specified multiple times") } } else { @@ -695,12 +695,12 @@ func (o *RestoreOptions) CombineWith(other *RestoreOptions) error { o.ExperimentalOnline = other.ExperimentalOnline } - if o.StripLocalities { - if other.StripLocalities { - return errors.New("strip_localities specified multiple times") + if o.RemoveRegions { + if other.RemoveRegions { + return errors.New("remove_regions specified multiple times") } } else { - o.StripLocalities = other.StripLocalities + o.RemoveRegions = other.RemoveRegions } return nil @@ -730,7 +730,7 @@ func (o RestoreOptions) IsDefault() bool { o.UnsafeRestoreIncompatibleVersion == options.UnsafeRestoreIncompatibleVersion && o.ExecutionLocality == options.ExecutionLocality && o.ExperimentalOnline == options.ExperimentalOnline && - o.StripLocalities == options.StripLocalities + o.RemoveRegions == options.RemoveRegions } // BackupTargetList represents a list of targets.