From 26acc2510b566ff8d40fdb2c60d0ec1207e6b826 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Mar 2024 21:23:00 +0000 Subject: [PATCH 01/10] sql: remove default_target_cluster.check_service.enabled Release note (enterprise change): default_target_cluster can now be set to any tenant name by default, including a tenant yet to be created or have service started. Epic: none. --- pkg/multitenant/tenant_config.go | 11 ---------- pkg/settings/registry.go | 23 ++++++++++---------- pkg/sql/logictest/testdata/logic_test/tenant | 21 ------------------ pkg/sql/set_cluster_setting.go | 13 ----------- pkg/sql/tenant_update.go | 18 --------------- 5 files changed, 12 insertions(+), 74 deletions(-) diff --git a/pkg/multitenant/tenant_config.go b/pkg/multitenant/tenant_config.go index 62795c08fdbb..51160d084a75 100644 --- a/pkg/multitenant/tenant_config.go +++ b/pkg/multitenant/tenant_config.go @@ -32,17 +32,6 @@ var DefaultTenantSelect = settings.RegisterStringSetting( settings.WithName(DefaultClusterSelectSettingName), ) -// VerifyTenantService determines whether there should be an advisory -// interlock between changes to the tenant service and changes to the -// above cluster setting. -var VerifyTenantService = settings.RegisterBoolSetting( - settings.SystemOnly, - "server.controller.default_tenant.check_service.enabled", - "verify that the service mode is coherently set with the value of "+DefaultClusterSelectSettingName, - true, - settings.WithName(DefaultClusterSelectSettingName+".check_service.enabled"), -) - // WaitForClusterStartTimeout is the amount of time the tenant // controller will wait for the default virtual cluster to have an // active SQL server. diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 76fc373cb001..35a694b259bc 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -229,17 +229,18 @@ var retiredSettings = map[InternalKey]struct{}{ "bulkio.restore.remove_regions.enabled": {}, // removed as of 24.1 - "storage.mvcc.range_tombstones.enabled": {}, - "changefeed.balance_range_distribution.enable": {}, - "changefeed.mux_rangefeed.enabled": {}, - "kv.rangefeed.catchup_scan_concurrency": {}, - "kv.rangefeed.scheduler.enabled": {}, - "physical_replication.producer.mux_rangefeeds.enabled": {}, - "kv.rangefeed.use_dedicated_connection_class.enabled": {}, - "sql.trace.session_eventlog.enabled": {}, - "sql.show_ranges_deprecated_behavior.enabled": {}, - "sql.drop_virtual_cluster.enabled": {}, - "cross_cluster_replication.enabled": {}, + "storage.mvcc.range_tombstones.enabled": {}, + "changefeed.balance_range_distribution.enable": {}, + "changefeed.mux_rangefeed.enabled": {}, + "kv.rangefeed.catchup_scan_concurrency": {}, + "kv.rangefeed.scheduler.enabled": {}, + "physical_replication.producer.mux_rangefeeds.enabled": {}, + "kv.rangefeed.use_dedicated_connection_class.enabled": {}, + "sql.trace.session_eventlog.enabled": {}, + "sql.show_ranges_deprecated_behavior.enabled": {}, + "sql.drop_virtual_cluster.enabled": {}, + "cross_cluster_replication.enabled": {}, + "server.controller.default_tenant.check_service.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index bd6c62948904..944a6b9bdc6e 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -514,21 +514,9 @@ subtest regression_105115 statement ok CREATE TENANT noservice -statement error shared service not enabled for tenant "noservice" -SET CLUSTER SETTING server.controller.default_target_cluster = noservice - -statement ok -SET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled = false - statement ok SET CLUSTER SETTING server.controller.default_target_cluster = noservice -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled - -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster - statement ok DROP TENANT noservice; CREATE TENANT withservice; @@ -539,19 +527,10 @@ ALTER TENANT withservice START SERVICE SHARED statement ok SET CLUSTER SETTING server.controller.default_target_cluster = withservice -statement error cannot stop service while tenant is selected as default -ALTER TENANT withservice STOP SERVICE - -statement ok -SET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled = false - statement ok ALTER TENANT withservice STOP SERVICE # clean up -statement ok -RESET CLUSTER SETTING server.controller.default_target_cluster.check_service.enabled - statement ok RESET CLUSTER SETTING server.controller.default_target_cluster diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 8834f695b947..6a6a6d5f5e5e 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -22,8 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/multitenant" - "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" @@ -358,17 +356,6 @@ func (n *setClusterSettingNode) startExec(params runParams) error { // Report tracked cluster settings via telemetry. // TODO(justin): implement a more general mechanism for tracking these. switch n.name { - case multitenant.DefaultClusterSelectSettingName: - if multitenant.VerifyTenantService.Get(&n.st.SV) && expectedEncodedValue != "" { - tr, err := GetTenantRecordByName(params.ctx, n.st, params.p.InternalSQLTxn(), roachpb.TenantName(expectedEncodedValue)) - if err != nil { - return errors.Wrapf(err, "failed to lookup tenant %q", expectedEncodedValue) - } - if tr.ServiceMode != mtinfopb.ServiceModeShared { - return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, - "shared service not enabled for tenant %q", expectedEncodedValue) - } - } case catpb.AutoStatsEnabledSettingName: switch expectedEncodedValue { case "true": diff --git a/pkg/sql/tenant_update.go b/pkg/sql/tenant_update.go index aa8886950c9c..f3b6597b9fc5 100644 --- a/pkg/sql/tenant_update.go +++ b/pkg/sql/tenant_update.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -110,23 +109,6 @@ func validateTenantInfo( info.ServiceMode, info.DataState) } - // Sanity check. Note that this interlock is not a guarantee that - // the cluster setting will never be set to an invalid tenant. There - // is a race condition between changing the cluster setting and the - // check here. Generally, other subsystems should always tolerate - // when the cluster setting is set to a tenant without service (or - // even one that doesn't exist). - if multitenant.VerifyTenantService.Get(&settings.SV) && - info.ServiceMode == mtinfopb.ServiceModeNone && - info.Name != "" && - multitenant.DefaultTenantSelect.Get(&settings.SV) == string(info.Name) { - return errors.WithHintf( - pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, - "cannot stop service while tenant is selected as default"), - "Update the cluster setting %q to a different value.", - multitenant.DefaultClusterSelectSettingName) - } - return nil } From 2be110d2b516c9710d855b33f1bc6d654ace330b Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:46:40 -0400 Subject: [PATCH 02/10] roachtest: move helper into utils.go The mean over slice function is useful for other raochtests as well. Release note: None --- .../tests/admission_control_intent_resolution.go | 16 ---------------- pkg/cmd/roachtest/tests/util.go | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go index a74aaa3f0a96..16de58b648aa 100644 --- a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go +++ b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go @@ -174,19 +174,3 @@ func registerIntentResolutionOverload(r registry.Registry) { }, }) } - -// Returns the mean over the last n samples. If n > len(items), returns the mean -// over the entire items slice. -func getMeanOverLastN(n int, items []float64) float64 { - count := n - if len(items) < n { - count = len(items) - } - sum := float64(0) - i := 0 - for i < count { - sum += items[len(items)-1-i] - i++ - } - return sum / float64(count) -} diff --git a/pkg/cmd/roachtest/tests/util.go b/pkg/cmd/roachtest/tests/util.go index 85c5b9102978..d65ed33c3564 100644 --- a/pkg/cmd/roachtest/tests/util.go +++ b/pkg/cmd/roachtest/tests/util.go @@ -232,3 +232,19 @@ func maybeUseMemoryBudget(t test.Test, budget int) option.StartOpts { } return startOpts } + +// Returns the mean over the last n samples. If n > len(items), returns the mean +// over the entire items slice. +func getMeanOverLastN(n int, items []float64) float64 { + count := n + if len(items) < n { + count = len(items) + } + sum := float64(0) + i := 0 + for i < count { + sum += items[len(items)-1-i] + i++ + } + return sum / float64(count) +} From b3ead235cfd9d6a26e788a067add719281547a13 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:50:31 -0400 Subject: [PATCH 03/10] roachtest: admission-control/elastic-io deflake Similar to https://github.com/cockroachdb/cockroach/pull/114446, we now take the mean over the last two minutes for determining high L0 sublevel count. Fixes #119838. Release note: None --- .../tests/admission_control_elastic_io.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_io.go b/pkg/cmd/roachtest/tests/admission_control_elastic_io.go index cdf0a98fb9ae..447ec5390ca2 100644 --- a/pkg/cmd/roachtest/tests/admission_control_elastic_io.go +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_io.go @@ -122,16 +122,24 @@ func registerElasticIO(r registry.Registry) { // not working, the threshold of 7 will be easily breached, since // regular tokens allow sub-levels to exceed 10. const subLevelThreshold = 7 + const sampleCountForL0Sublevel = 12 + var l0SublevelCount []float64 // Sleep initially for stability to be achieved, before measuring. time.Sleep(5 * time.Minute) for { - time.Sleep(30 * time.Second) + time.Sleep(10 * time.Second) val, err := getMetricVal(subLevelMetric) if err != nil { continue } - if val > subLevelThreshold { - t.Fatalf("sub-level count %f exceeded threshold", val) + l0SublevelCount = append(l0SublevelCount, val) + // We want to use the mean of the last 2m of data to avoid short-lived + // spikes causing failures. + if len(l0SublevelCount) >= sampleCountForL0Sublevel { + latestSampleMeanL0Sublevels := getMeanOverLastN(sampleCountForL0Sublevel, l0SublevelCount) + if latestSampleMeanL0Sublevels > subLevelThreshold { + t.Fatalf("sub-level mean %f over last %d iterations exceeded threshold", latestSampleMeanL0Sublevels, sampleCountForL0Sublevel) + } } if timeutil.Now().After(endTime) { return nil From 5ae54173fe1dd4a1080d14ac4071da2224655013 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 12 Mar 2024 09:17:48 -0700 Subject: [PATCH 04/10] backfill: finish the tracing span after closing the account We recently fixed an issue where we forgot to stop the index backfill merger monitor, but we had a minor bug in that fix - we captured the context that contains the tracing span that is finished before the account is closed leading to "use after finish" assertions. This is now fixed. Release note: None --- pkg/sql/backfill/mvcc_index_merger.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 81ee80a3a87b..591fcca37447 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -103,6 +103,10 @@ const indexBackfillMergeProgressReportInterval = 10 * time.Second // Run runs the processor. func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowReceiver) { + opName := "IndexBackfillMerger" + ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID)) + ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID) + defer span.Finish() // This method blocks until all worker goroutines exit, so it's safe to // close memory monitoring infra in defers. mergerMon := execinfra.NewMonitor(ctx, ibm.flowCtx.Cfg.BackfillerMonitor, "index-backfiller-merger-mon") @@ -113,10 +117,6 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowRec defer ibm.muBoundAccount.Unlock() ibm.muBoundAccount.boundAccount.Close(ctx) }() - opName := "IndexBackfillMerger" - ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID)) - ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID) - defer span.Finish() defer output.ProducerDone() defer execinfra.SendTraceData(ctx, ibm.flowCtx, output) From d0bb7e47c6649dce2d8efa0c26b999ed5d161fe6 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Tue, 12 Mar 2024 12:22:42 -0500 Subject: [PATCH 05/10] cmd: link on the `large` pool The `default` pool seems to be too small to perform linking efficiently. This should speed things up. Epic: CRDB-8308 Release note: None --- pkg/cmd/cockroach-oss/BUILD.bazel | 1 + pkg/cmd/cockroach-short/BUILD.bazel | 1 + pkg/cmd/cockroach-sql/BUILD.bazel | 1 + pkg/cmd/cockroach/BUILD.bazel | 1 + pkg/cmd/roachprod/BUILD.bazel | 1 + 5 files changed, 5 insertions(+) diff --git a/pkg/cmd/cockroach-oss/BUILD.bazel b/pkg/cmd/cockroach-oss/BUILD.bazel index 755eca7271ab..731a7b5a6f75 100644 --- a/pkg/cmd/cockroach-oss/BUILD.bazel +++ b/pkg/cmd/cockroach-oss/BUILD.bazel @@ -15,6 +15,7 @@ go_library( go_binary( name = "cockroach-oss", embed = [":cockroach-oss_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach-short/BUILD.bazel b/pkg/cmd/cockroach-short/BUILD.bazel index 919d53a63866..2f89c15412a2 100644 --- a/pkg/cmd/cockroach-short/BUILD.bazel +++ b/pkg/cmd/cockroach-short/BUILD.bazel @@ -14,5 +14,6 @@ go_library( go_binary( name = "cockroach-short", embed = [":cockroach-short_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach-sql/BUILD.bazel b/pkg/cmd/cockroach-sql/BUILD.bazel index 94a2d2db1497..1808ee752cfa 100644 --- a/pkg/cmd/cockroach-sql/BUILD.bazel +++ b/pkg/cmd/cockroach-sql/BUILD.bazel @@ -26,5 +26,6 @@ go_library( go_binary( name = "cockroach-sql", embed = [":cockroach-sql_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/cockroach/BUILD.bazel b/pkg/cmd/cockroach/BUILD.bazel index 74d8fba934aa..b6da4a42daf9 100644 --- a/pkg/cmd/cockroach/BUILD.bazel +++ b/pkg/cmd/cockroach/BUILD.bazel @@ -25,5 +25,6 @@ disallowed_imports_test( go_binary( name = "cockroach", embed = [":cockroach_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) diff --git a/pkg/cmd/roachprod/BUILD.bazel b/pkg/cmd/roachprod/BUILD.bazel index 5440091cad0a..539eeccab280 100644 --- a/pkg/cmd/roachprod/BUILD.bazel +++ b/pkg/cmd/roachprod/BUILD.bazel @@ -35,5 +35,6 @@ go_library( go_binary( name = "roachprod", embed = [":roachprod_lib"], + exec_properties = {"Pool": "large"}, visibility = ["//visibility:public"], ) From 5ca24e31324e68614f206cb9b1fb5fe6e9d4ca28 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Tue, 12 Mar 2024 12:54:38 -0500 Subject: [PATCH 06/10] logictest: skip some tests under `race` These tests specifically are prone to failing/timing out under `race`. Epic: CRDB-8308 Release note: None --- pkg/sql/logictest/testdata/logic_test/lookup_join | 2 ++ pkg/sql/logictest/testdata/logic_test/stats | 2 ++ pkg/sql/logictest/testdata/logic_test/upsert | 2 ++ pkg/sql/logictest/testdata/logic_test/vectorize_unsupported | 2 ++ pkg/sql/logictest/testdata/logic_test/window | 2 ++ 5 files changed, 10 insertions(+) diff --git a/pkg/sql/logictest/testdata/logic_test/lookup_join b/pkg/sql/logictest/testdata/logic_test/lookup_join index 6a2cf518ac4f..6b95e90bd70c 100644 --- a/pkg/sql/logictest/testdata/logic_test/lookup_join +++ b/pkg/sql/logictest/testdata/logic_test/lookup_join @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE abc (a INT, b INT, c INT, PRIMARY KEY (a, c)); INSERT INTO abc VALUES (1, 1, 2), (2, 1, 1), (2, NULL, 2) diff --git a/pkg/sql/logictest/testdata/logic_test/stats b/pkg/sql/logictest/testdata/logic_test/stats index 6d1fb6385b3b..51bf5bfbbce6 100644 --- a/pkg/sql/logictest/testdata/logic_test/stats +++ b/pkg/sql/logictest/testdata/logic_test/stats @@ -1,5 +1,7 @@ # LogicTest: !fakedist-disk +skip under race + # Note that we disable the "forced disk spilling" config because the histograms # are dropped if the stats collection reaches the memory budget limit. diff --git a/pkg/sql/logictest/testdata/logic_test/upsert b/pkg/sql/logictest/testdata/logic_test/upsert index 34567404fc27..8c318e2a7ed5 100644 --- a/pkg/sql/logictest/testdata/logic_test/upsert +++ b/pkg/sql/logictest/testdata/logic_test/upsert @@ -1,3 +1,5 @@ +skip under race + subtest strict statement ok diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported b/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported index d654a808e453..048b9285e9c5 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_unsupported @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE a (a INT, b INT, c INT4, PRIMARY KEY (a, b)); INSERT INTO a SELECT g//2, g, g FROM generate_series(0,2000) g(g) diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index 035a490879a3..6a8b3e318f62 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -1,3 +1,5 @@ +skip under race + statement ok CREATE TABLE kv ( -- don't add column "a" From de41ff6e59b621eb14b4184c0b1d8cfb488b8edf Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 12 Mar 2024 14:05:07 -0400 Subject: [PATCH 07/10] roachtest: bump max ranges threshold in splits/load/ycsb/e/nodes=3/obj=cpu Fixes #120163. Avoids rare test flakes. Release note: None --- pkg/cmd/roachtest/tests/split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/split.go b/pkg/cmd/roachtest/tests/split.go index 58eaa13e3163..88d54a80747c 100644 --- a/pkg/cmd/roachtest/tests/split.go +++ b/pkg/cmd/roachtest/tests/split.go @@ -409,7 +409,7 @@ func registerLoadSplits(r registry.Registry) { // YCSB/E has a zipfian distribution with 95% scans (limit 1k) and 5% // inserts. minimumRanges: 5, - maximumRanges: 15, + maximumRanges: 18, initialRangeCount: 2, load: ycsbSplitLoad{ workload: "e", From 1201b174a1598464951cfc6748c245853945ef6b Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 12 Mar 2024 14:11:52 -0400 Subject: [PATCH 08/10] go.mod: bump Pebble to 51faab0a3555 Changes: * [`51faab0a`](https://github.com/cockroachdb/pebble/commit/51faab0a) tool: add DirectoryLock option * [`ec69e9a2`](https://github.com/cockroachdb/pebble/commit/ec69e9a2) ingest test: fix merge skew * [`635c6003`](https://github.com/cockroachdb/pebble/commit/635c6003) manifest: add VersionEdit tests with virtual tables * [`cb660884`](https://github.com/cockroachdb/pebble/commit/cb660884) manifest: improve VersionEdit stringification * [`31b37248`](https://github.com/cockroachdb/pebble/commit/31b37248) ingest_test: support reopening in ingest tests * [`64ebec94`](https://github.com/cockroachdb/pebble/commit/64ebec94) ingest_test: set correct sizes for external ingests * [`4bf09d5e`](https://github.com/cockroachdb/pebble/commit/4bf09d5e) manifest: improve VersionEdit tests * [`a034560d`](https://github.com/cockroachdb/pebble/commit/a034560d) manifest: add a helper for DebugString parsing * [`623524f1`](https://github.com/cockroachdb/pebble/commit/623524f1) manifest: use Lx for levels in version String/DebugString Release note: none. Epic: none. --- DEPS.bzl | 6 +++--- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 4127d5b9443a..e69eac899587 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1693,10 +1693,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240308204553-8df4320c24e4", + sha256 = "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240312180812-51faab0a3555", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 278af1edeb38..05e3e99fe114 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -330,7 +330,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip": "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip": "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1", diff --git a/go.mod b/go.mod index 86199c4bfdc2..9546168b17d2 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4 + github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555 github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index a31b22d8cb41..852e31e87f2f 100644 --- a/go.sum +++ b/go.sum @@ -508,8 +508,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= -github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4 h1:yuJAmwkJTQjB5YyoNkmXlK9/2YR+jWizfE7crErqGhU= -github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c= +github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555 h1:NRPlms/+HfNgTPMrvSTU/bKDDps4/6vSvPnogZ4HzYw= +github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= From c097c1981a43c293d72e076851c8a7a78fd0628c Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 28 Jul 2022 15:03:09 -0400 Subject: [PATCH 09/10] storage: add ImportEpoch field to MVCCValueHeader This patch adds an ImportEpoch field to an MVCCValue's MVCCValueHeader, which allows KV clients (namely the sst_batcher in an IMPORT INTO) to write the importing table's ImportEpoch to the metadata of each ingesting MVCCValue. Unlike the MVCCValueHeader.LocalTimestamp field, the ImportEpoch field should be exported to other clusters (e.g. via ExportRequests from BACKUP/RESTORE and streaming). Consequently, this PR relaxes the invariant that the MVCCValueHeader field must be stripped in an ExportRequest and must be empty in an AddSSTable Request. Now, ExportRequest emits MVCCValueHeaders with ImportEpoch set if it was set in the original value and AddSSTable only requires the LocalTimestamp to be empty. Epic: none Release note: none Co-authored-by: Steven Danna --- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 7 +- .../batcheval/cmd_add_sstable_test.go | 4 +- pkg/storage/enginepb/mvcc3.proto | 6 ++ pkg/storage/enginepb/mvcc3_test.go | 1 + pkg/storage/enginepb/mvcc3_valueheader.go | 1 + pkg/storage/mvcc.go | 7 +- pkg/storage/mvcc_value.go | 25 ++++- pkg/storage/mvcc_value_test.go | 94 +++++++++++++++---- .../headerFull_bytes | 3 + .../TestEncodeDecodeMVCCValue/headerFull_int | 3 + .../headerFull_tombstone | 3 + .../headerJobIDOnly_bytes | 3 + .../headerJobIDOnly_int | 3 + .../headerJobIDOnly_tombstone | 3 + 14 files changed, 138 insertions(+), 25 deletions(-) create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 5aceb0e9a83e..5701ec0577be 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -550,7 +550,7 @@ func EvalAddSSTable( // * Only SST set operations (not explicitly verified). // * No intents or unversioned values. // * If sstTimestamp is set, all MVCC timestamps equal it. -// * MVCCValueHeader is empty. +// * The LocalTimestamp in the MVCCValueHeader is empty. // * Given MVCC stats match the SST contents. func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.MVCCStats) error { @@ -584,8 +584,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M return errors.NewAssertionErrorWithWrappedErrf(err, "SST contains invalid value for key %s", key) } - if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) { - return errors.AssertionFailedf("SST contains non-empty MVCC value header for key %s", key) + if !value.MVCCValueHeader.LocalTimestamp.IsEmpty() { + return errors.AssertionFailedf("SST contains non-empty Local Timestamp in the MVCC value"+ + " header for key %s", key) } } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 0dbf41fae425..d871f9d8b157 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -144,7 +144,7 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, + expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`, }, "blind rejects local timestamp on range key under race only": { // unfortunately, for performance sst: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")}, @@ -297,7 +297,7 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 10, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, + expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`, }, "SSTTimestampToRequestTimestamp with DisallowConflicts causes estimated stats with range key masking": { reqTS: 5, diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index a17cf5f6ba80..0c3baaacbbc7 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -198,6 +198,10 @@ message MVCCValueHeader { // not be available in changefeeds. This allows higher levels of the system to // control which writes are exported. bool omit_in_rangefeeds = 3; + + // ImportEpoch identifies the number of times a user has called IMPORT + // INTO on the table this key belongs to when the table was not empty. + uint32 import_epoch = 4; } // MVCCValueHeaderPure is not to be used directly. It's generated only for use of @@ -207,6 +211,7 @@ message MVCCValueHeaderPure { (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; bool omit_in_rangefeeds = 3; + uint32 import_epoch = 4; } // MVCCValueHeaderCrdbTest is not to be used directly. It's generated only for use of // its marshaling methods by MVCCValueHeader. See the comment there. @@ -219,6 +224,7 @@ message MVCCValueHeaderCrdbTest { util.hlc.Timestamp local_timestamp = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; bool omit_in_rangefeeds = 3; + uint32 import_epoch = 4; } // MVCCStatsDelta is convertible to MVCCStats, but uses signed variable width diff --git a/pkg/storage/enginepb/mvcc3_test.go b/pkg/storage/enginepb/mvcc3_test.go index 565fc3f37b3d..16413279006c 100644 --- a/pkg/storage/enginepb/mvcc3_test.go +++ b/pkg/storage/enginepb/mvcc3_test.go @@ -31,6 +31,7 @@ func populatedMVCCValueHeader() MVCCValueHeader { allFieldsSet := MVCCValueHeader{ LocalTimestamp: hlc.ClockTimestamp{WallTime: 1, Logical: 1}, OmitInRangefeeds: true, + ImportEpoch: 1, } allFieldsSet.KVNemesisSeq.Set(123) return allFieldsSet diff --git a/pkg/storage/enginepb/mvcc3_valueheader.go b/pkg/storage/enginepb/mvcc3_valueheader.go index 17e9faa07a82..81f50ff5d2ab 100644 --- a/pkg/storage/enginepb/mvcc3_valueheader.go +++ b/pkg/storage/enginepb/mvcc3_valueheader.go @@ -22,6 +22,7 @@ func (h *MVCCValueHeader) pure() MVCCValueHeaderPure { return MVCCValueHeaderPure{ LocalTimestamp: h.LocalTimestamp, OmitInRangefeeds: h.OmitInRangefeeds, + ImportEpoch: h.ImportEpoch, } } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index f6a4e09d9467..5ecf5fa6b705 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7884,9 +7884,10 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - // Export only the inner roachpb.Value, not the MVCCValue header. - unsafeValue = mvccValue.Value.RawBytes - + unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if err != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + } // Skip tombstone records when start time is zero (non-incremental) // and we are not exporting all versions. skip = skipTombstones && mvccValue.IsTombstone() diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 85c288999f35..27bb99fad239 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -12,6 +12,8 @@ package storage import ( "encoding/binary" + "fmt" + "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -118,15 +120,36 @@ func (v MVCCValue) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { if v.MVCCValueHeader != (enginepb.MVCCValueHeader{}) { + fields := make([]string, 0) w.Printf("{") if !v.LocalTimestamp.IsEmpty() { - w.Printf("localTs=%s", v.LocalTimestamp) + fields = append(fields, fmt.Sprintf("localTs=%s", v.LocalTimestamp)) } + if v.ImportEpoch != 0 { + fields = append(fields, fmt.Sprintf("importEpoch=%v", v.ImportEpoch)) + } + w.Print(strings.Join(fields, ", ")) w.Printf("}") } w.Print(v.Value.PrettyPrint()) } +// EncodeMVCCValueForExport strips fields from the MVCCValueHeader that +// should not get exported out of the cluster. +// +//gcassert:inline +func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { + // Consider a fast path, where only the roachpb.Value gets exported. + // Currently, this only occurs if the value was not imported. + if mvccValue.ImportEpoch == 0 { + return mvccValue.Value.RawBytes, nil + } + + // Manually strip off any non-exportable fields, and re-encode the mvcc value. + mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{} + return EncodeMVCCValue(mvccValue) +} + // When running a metamorphic build, disable the simple MVCC value encoding to // prevent code from assuming that the MVCCValue encoding is identical to the // roachpb.Value encoding. diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 9c721574a8a8..6a2084390a3b 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -80,26 +80,72 @@ func TestMVCCValueGetLocalTimestamp(t *testing.T) { } } +func TestEncodeMVCCValueForExport(t *testing.T) { + defer leaktest.AfterTest(t)() + var strVal, intVal roachpb.Value + strVal.SetString("foo") + intVal.SetInt(17) + + var importEpoch uint32 = 3 + tsHeader := enginepb.MVCCValueHeader{LocalTimestamp: hlc.ClockTimestamp{WallTime: 9}} + + valHeaderFull := tsHeader + valHeaderFull.ImportEpoch = importEpoch + + jobIDHeader := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + + testcases := map[string]struct { + val MVCCValue + expect MVCCValue + }{ + "noHeader": {val: MVCCValue{Value: intVal}, expect: MVCCValue{Value: intVal}}, + "tsHeader": {val: MVCCValue{MVCCValueHeader: tsHeader, Value: intVal}, expect: MVCCValue{Value: intVal}}, + "jobIDOnly": {val: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}, expect: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}}, + "fullHeader": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: MVCCValue{MVCCValueHeader: jobIDHeader, Value: intVal}}, + } + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + encodedVal, err := EncodeMVCCValueForExport(tc.val) + require.NoError(t, err) + strippedMVCCVal, err := DecodeMVCCValue(encodedVal) + require.NoError(t, err) + require.Equal(t, tc.expect, strippedMVCCVal) + }) + } + +} func TestMVCCValueFormat(t *testing.T) { defer leaktest.AfterTest(t)() var strVal, intVal roachpb.Value strVal.SetString("foo") intVal.SetInt(17) + var importEpoch uint32 = 3 valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} + valHeaderFull := valHeader + valHeaderFull.ImportEpoch = importEpoch + + valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + testcases := map[string]struct { val MVCCValue expect string }{ - "tombstone": {val: MVCCValue{}, expect: "/"}, - "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, - "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, - "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, - "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, - "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, + "tombstone": {val: MVCCValue{}, expect: "/"}, + "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, + "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, + "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, + "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, + "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, + "headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/"}, + "headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"}, + "headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"}, + "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3}/"}, + "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3}/BYTES/foo"}, + "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3}/INT/17"}, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { @@ -115,19 +161,31 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { var strVal, intVal roachpb.Value strVal.SetString("foo") intVal.SetInt(17) + var importEpoch uint32 = 3 valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} + valHeaderFull := valHeader + valHeaderFull.ImportEpoch = importEpoch + + valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} + testcases := map[string]struct { val MVCCValue }{ - "tombstone": {val: MVCCValue{}}, - "bytes": {val: MVCCValue{Value: strVal}}, - "int": {val: MVCCValue{Value: intVal}}, - "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}}, - "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}}, - "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}}, + "tombstone": {val: MVCCValue{}}, + "bytes": {val: MVCCValue{Value: strVal}}, + "int": {val: MVCCValue{Value: intVal}}, + "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}}, + "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}}, + "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}}, + "headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}}, + "headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}}, + "headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}}, + "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}}, + "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}}, + "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) for name, tc := range testcases { @@ -183,15 +241,19 @@ func mvccValueBenchmarkConfigs() ( values map[string]roachpb.Value, ) { headers = map[string]enginepb.MVCCValueHeader{ - "empty": {}, - "local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}}, - "local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}}, - "omit in rangefeeds": {OmitInRangefeeds: true}, + "empty": {}, + "jobID": {ImportEpoch: 3}, + "local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}}, + "local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}}, + "omit in rangefeeds": {OmitInRangefeeds: true}, + "local walltime+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}, ImportEpoch: 3}, + "local walltime+logical+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}, ImportEpoch: 3}, } if testing.Short() { // Reduce the number of configurations in short mode. delete(headers, "local walltime") delete(headers, "omit in rangefeeds") + delete(headers, "local walltime+jobID") } values = map[string]roachpb.Value{ "tombstone": {}, diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes new file mode 100644 index 000000000000..639f29abdbb1 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a02080920030000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int new file mode 100644 index 000000000000..aeeffb2ef055 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a0208092003000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone new file mode 100644 index 000000000000..767d07819092 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000006650a0208092003 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes new file mode 100644 index 000000000000..82d66cbef4ad --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_bytes @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a0020030000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int new file mode 100644 index 000000000000..5b3bb6afacbd --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_int @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a002003000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone new file mode 100644 index 000000000000..2f360470fbc7 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerJobIDOnly_tombstone @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000004650a002003 From 2d4b60173a62ba7eab2ca0bd0a59bb5226b30ed1 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 28 Jul 2022 15:26:08 -0400 Subject: [PATCH 10/10] kv/bulk: write ImportEpoch to each MVCCValue during IMPORT This patch makes IMPORT INTO on a non-empty table write the table's ImportEpoch to each ingested MVCC Value, via the SSTBatcher. In a future PR, the ImportEpoch will be used rollback an IMPORT INTO in some cases. This additional information will allow IMPORTing tables to be backed up and restored. As part of this change we now also assume we might see an MVCCValue during restore. * Version Gating Previously, callers could (and did) assume that the values present in the SSTs returned by export request could be interpreted directly as roachpb.Value objects using code like: roachpb.Value{RawBytes: valBytes} For MVCCValueHeaders to be exported by ExportRequest all callers need to be updated: 1. ExportRequest on system.descriptors in sql/catalog/lease 2. ExportRequest on system.descriptors in ccl/changefeedccl/schemafeed 3. ExportRequest used by `FINGERPRINT` 4. ExportRequest used by old binaries in a mixed-version cluster. (1) and (2) will be easy to update and likely don't matter in practice moment as those tables do not include values with exportable value headers at the moment. (3) will be easy to update, but we still need an option to exclude value headers (a) until value headers are included in rangefeeds and (b) so long as we want to compare fingerprints with 23.2 versions. (4) is impossible to update so if we want BACKUP/RESTORE to round-trip in mixed version cluster we must version gate including them in backups until the cluster is on a single version. To account for this we only increment ImportEpoch during IMPORTs that start on 24.1 or greater and we only request MVCCValueHeaders on BACKUPs that start on 24.1 or greater. The first condition is important to ensure that we can later detect a table that can be fully rolled back using the new rollback method. Note that this also marks a hard backward incompatibility for backup artifacts. Backups for 24.1 cannot be restored on 23.2 or older. This was already the case by policy. 23.2 backups should still work fine on 24.1 since all roachpb.Value's should properly decode as MVCCValue's. Informs #76722 Release note: None Co-authored-by: Steven Danna --- pkg/ccl/backupccl/backup_job.go | 1 + pkg/ccl/backupccl/backup_processor.go | 14 +-- .../backupccl/backup_processor_planning.go | 45 +++---- pkg/ccl/backupccl/restore_data_processor.go | 15 ++- pkg/kv/bulk/buffering_adder.go | 20 ++- pkg/kv/bulk/sst_batcher.go | 18 ++- pkg/kv/bulk/sst_batcher_test.go | 77 +++++++++++- pkg/kv/kvpb/api.proto | 9 +- pkg/kv/kvserver/batcheval/cmd_export.go | 1 + pkg/kv/kvserver/kvserverbase/bulk_adder.go | 9 ++ pkg/sql/execinfrapb/processors_bulk_io.proto | 9 +- pkg/sql/importer/BUILD.bazel | 3 + pkg/sql/importer/import_job.go | 27 +++- pkg/sql/importer/import_mvcc_test.go | 116 ++++++++++++++++++ pkg/sql/importer/import_processor.go | 12 ++ pkg/sql/importer/import_processor_test.go | 6 +- pkg/storage/mvcc.go | 18 ++- pkg/storage/mvcc_value.go | 40 ++++-- pkg/storage/mvcc_value_test.go | 67 ++++++++++ 19 files changed, 451 insertions(+), 56 deletions(-) create mode 100644 pkg/sql/importer/import_mvcc_test.go diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index bb20648c59db..6ef7bc31db92 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -244,6 +244,7 @@ func backup( backupManifest.StartTime, backupManifest.EndTime, backupManifest.ElidedPrefix, + backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()), ) if err != nil { return roachpb.RowCount{}, 0, err diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index f44c604b91a1..7eba8ba4dc70 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -487,14 +487,14 @@ func runBackupProcessor( if !span.firstKeyTS.IsEmpty() { splitMidKey = true } - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - SplitMidKey: splitMidKey, + RequestHeader: kvpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + SplitMidKey: splitMidKey, + IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader, } // If we're doing re-attempts but are not yet in the priority regime, diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 9edcccb37f59..91b48f9b005b 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -46,6 +46,7 @@ func distBackupPlanSpecs( mvccFilter kvpb.MVCCFilter, startTime, endTime hlc.Timestamp, elide execinfrapb.ElidePrefix, + includeValueHeader bool, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") @@ -98,17 +99,18 @@ func distBackupPlanSpecs( sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec) for _, partition := range spanPartitions { spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - Spans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), - ElidePrefix: elide, + JobID: jobID, + Spans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + ElidePrefix: elide, + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } @@ -121,16 +123,17 @@ func distBackupPlanSpecs( // which is not the leaseholder for any of the spans, but is for an // introduced span. spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - IntroducedSpans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), + JobID: jobID, + IntroducedSpans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index e2ecf7d3ea4e..baa7d7a19ffa 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( return summary, err } valueScratch = append(valueScratch[:0], v...) - value := roachpb.Value{RawBytes: valueScratch} + value, err := storage.DecodeValueFromMVCCValue(valueScratch) + if err != nil { + return summary, err + } key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime) @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if verbose { log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint()) } - if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil { + + // Using valueScratch here assumes that + // DecodeValueFromMVCCValue, ClearChecksum, and + // InitChecksum don't copy/reallocate the slice they + // were given. We expect that value.ClearChecksum and + // value.InitChecksum calls above have modified + // valueScratch. + if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil { return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint()) } } diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 3de6446cdb60..ef585396b84a 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -63,6 +63,12 @@ type BufferingAdder struct { // name of the BufferingAdder for the purpose of logging only. name string + // importEpoch specifies the ImportEpoch of the table the BufferingAdder + // is ingesting data as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + importEpoch uint32 + bulkMon *mon.BytesMonitor memAcc mon.BoundAccount @@ -96,7 +102,8 @@ func MakeBulkAdder( } b := &BufferingAdder{ - name: opts.Name, + name: opts.Name, + importEpoch: opts.ImportEpoch, sink: SSTBatcher{ name: opts.Name, db: db, @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { for i := range b.curBuf.entries { mvccKey.Key = b.curBuf.Key(i) - if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { - return err + if b.importEpoch != 0 { + if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i), + b.importEpoch); err != nil { + return err + } + } else { + if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { + return err + } } } if err := b.sink.Flush(ctx); err != nil { diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 51dd1dc4bb23..e19ec6b05a03 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { b.mu.onFlush = onFlush } +func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( + ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, +) error { + mvccVal, err := storage.DecodeMVCCValue(value) + if err != nil { + return err + } + mvccVal.MVCCValueHeader.ImportEpoch = importEpoch + encVal, err := storage.EncodeMVCCValue(mvccVal) + if err != nil { + return err + } + return b.AddMVCCKey(ctx, key, encVal) +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value if !b.disallowShadowingBelow.IsEmpty() { b.updateMVCCStats(key, value) } - - return b.sstWriter.Put(key, value) + return b.sstWriter.PutRawMVCC(key, value) } // Reset clears all state in the batcher and prepares it for reuse. diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 4912a4bbe3e3..85082fdfddd0 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) { mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) b, err := bulk.MakeBulkAdder( - ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, + ctx, kvDB, mockCache, s.ClusterSettings(), ts, + kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, ) require.NoError(t, err) @@ -361,3 +362,77 @@ func runTestImport(t *testing.T, batchSizeValue int64) { }) } } + +var DummyImportEpoch uint32 = 3 + +func TestImportEpochIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + ctx := context.Background() + + mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) + reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(), + false, true, mem.MakeConcurrentBoundAccount(), reqs) + require.NoError(t, err) + defer b.Close(ctx) + + startKey := storageutils.PointKey("a", 1) + endKey := storageutils.PointKey("b", 1) + value := storageutils.StringValueRaw("myHumbleValue") + mvccValue, err := storage.DecodeMVCCValue(value) + require.NoError(t, err) + + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch)) + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch)) + require.NoError(t, b.Flush(ctx)) + + // Check that ingested key contains the dummy job ID + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey.Key, + EndKey: endKey.Key, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + kvDB.NonTransactionalSender(), header, req) + require.NoError(t, roachErr.GoError()) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey.Key, + UpperBound: endKey.Key, + } + + checkedJobId := false + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + require.Equal(t, startKey, it.UnsafeKey()) + require.Equal(t, mvccValue.Value, val.Value) + require.Equal(t, DummyImportEpoch, val.ImportEpoch) + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + checkedJobId = true + } + } + require.Equal(t, true, checkedJobId) +} diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 75a5b8031ede..4fcedd9f5dab 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1831,7 +1831,14 @@ message ExportRequest { FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false]; - // Next ID: 16 + // IncludeMVCCValueHeader controls whether the MVCCValueHeader is + // included in exported bytes. Callers should only set this when all + // readers of the returned SST are prepared to parse full a + // MVCCValue. Even when set, only fields appropriate for export are + // included. See storage.EncodeMVCCValueForExport for details. + bool include_mvcc_value_header = 16 [(gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // Next ID: 17 } message FingerprintOptions { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 6a1705004a6d..dea1d60e7487 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -197,6 +197,7 @@ func evalExport( TargetLockConflictBytes: targetLockConflictBytes, StopMidKey: args.SplitMidKey, ScanStats: cArgs.ScanStats, + IncludeMVCCValueHeader: args.IncludeMVCCValueHeader, } var summary kvpb.BulkOpSummary var resumeInfo storage.ExportRequestResumeInfo diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 62323091a093..601eb1a3bc26 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -69,6 +69,15 @@ type BulkAdderOptions struct { // the first buffer to pick split points in the hope it is a representative // sample of the overall input. InitialSplitsIfUnordered int + + // ImportEpoch specifies the ImportEpoch of the table the BulkAdder + // is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + // + // Callers should check that the cluster is at or above + // version 24.1 before setting this option. + ImportEpoch uint32 } // BulkAdderFactory describes a factory function for BulkAdders. diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 5073bc3d99ce..ad8eb0bd48c6 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -318,7 +318,14 @@ message BackupDataSpec { optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false]; - // NEXTID: 13. + + // IncludeMVCCValueHeader indicates whether the backup should be + // created with MVCCValueHeaders in the exported data. This should + // only be set on backups starting on cluster version 24.1 or + // greater. + optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // NEXTID: 14. } message RestoreFileSpec { diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 323e5d8acb2e..6a8008f0e59a 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -148,6 +148,7 @@ go_test( "exportparquet_test.go", "import_csv_mark_redaction_test.go", "import_into_test.go", + "import_mvcc_test.go", "import_processor_test.go", "import_stmt_test.go", "main_test.go", @@ -187,6 +188,7 @@ go_test( "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -229,6 +231,7 @@ go_test( "//pkg/sql/stats", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 30fedbe7936a..f7e8cda2259c 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -292,6 +292,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if len(details.Tables) > 1 { + for _, tab := range details.Tables { + if !tab.IsNew { + return errors.AssertionFailedf("all tables in multi-table import must be new") + } + } + } + procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV)) res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime, @@ -400,9 +408,11 @@ func (r *importResumer) prepareTablesForIngestion( var err error var newTableDescs []jobspb.ImportDetails_Table var desc *descpb.TableDescriptor + + useImportEpochs := p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V24_1) for i, table := range details.Tables { if !table.IsNew { - desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc) + desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc, useImportEpochs) if err != nil { return importDetails, err } @@ -480,7 +490,11 @@ func (r *importResumer) prepareTablesForIngestion( // prepareExistingTablesForIngestion prepares descriptors for existing tables // being imported into. func prepareExistingTablesForIngestion( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, desc *descpb.TableDescriptor, + ctx context.Context, + txn *kv.Txn, + descsCol *descs.Collection, + desc *descpb.TableDescriptor, + useImportEpochs bool, ) (*descpb.TableDescriptor, error) { if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) @@ -500,7 +514,14 @@ func prepareExistingTablesForIngestion( // Take the table offline for import. // TODO(dt): audit everywhere we get table descs (leases or otherwise) to // ensure that filtering by state handles IMPORTING correctly. - importing.OfflineForImport() + + // We only use the new OfflineForImport on 24.1, which bumps + // the ImportEpoch, if we are completely on 24.1. + if useImportEpochs { + importing.OfflineForImport() + } else { + importing.SetOffline(tabledesc.OfflineReasonImporting) + } // TODO(dt): de-validate all the FKs. if err := descsCol.WriteDesc( diff --git a/pkg/sql/importer/import_mvcc_test.go b/pkg/sql/importer/import_mvcc_test.go new file mode 100644 index 000000000000..f1225b10576a --- /dev/null +++ b/pkg/sql/importer/import_mvcc_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package importer_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestMVCCValueHeaderImportEpoch tests that the import job ID is properly +// stored in the MVCCValueHeader in an imported key's MVCCValue. +func TestMVCCValueHeaderImportEpoch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + server, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := server.ApplicationLayer() + defer server.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE DATABASE d`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + fmt.Fprint(w, "1") + } + })) + defer srv.Close() + + // Create a table where the first row ( in sort order) comes from an IMPORT + // while the second comes from an INSERT. + sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`) + sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`) + sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL) + + // Conduct an export request to iterate over the keys in the table. + var tableID uint32 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, + "t").Scan(&tableID) + + startKey := s.Codec().TablePrefix(tableID) + endKey := startKey.PrefixEnd() + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + s.DistSenderI().(*kvcoord.DistSender), header, req) + require.NoError(t, roachErr.GoError()) + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey, + UpperBound: endKey, + } + + // Ensure there are 2 keys in the span, and only the first one contains job ID metadata + keyCount := 0 + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + if keyCount == 0 { + require.NotEqual(t, uint32(0), val.ImportEpoch) + } else if keyCount == 1 { + require.Equal(t, uint32(0), val.ImportEpoch) + } else { + t.Fatal("more than 2 keys in the table") + } + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + keyCount++ + } + } +} diff --git a/pkg/sql/importer/import_processor.go b/pkg/sql/importer/import_processor.go index ef79797eac16..f49441738cad 100644 --- a/pkg/sql/importer/import_processor.go +++ b/pkg/sql/importer/import_processor.go @@ -389,6 +389,16 @@ func ingestKvs( // will hog memory as it tries to grow more aggressively. minBufferSize, maxBufferSize := importBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */) + + var bulkAdderImportEpoch uint32 + for _, v := range spec.Tables { + if bulkAdderImportEpoch == 0 { + bulkAdderImportEpoch = v.Desc.ImportEpoch + } else if bulkAdderImportEpoch != v.Desc.ImportEpoch { + return nil, errors.AssertionFailedf("inconsistent import epoch on multi-table import") + } + } + pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{ Name: pkAdderName, DisallowShadowingBelow: writeTS, @@ -397,6 +407,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err @@ -413,6 +424,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 3a959327ae6f..a1ecc42be252 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -243,7 +243,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -368,7 +368,7 @@ func TestImportHonorsResumePosition(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -502,7 +502,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 5ecf5fa6b705..ab63da1d7116 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7884,9 +7884,13 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - unsafeValue, err = EncodeMVCCValueForExport(mvccValue) - if err != nil { - return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + if opts.IncludeMVCCValueHeader { + unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if err != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + } + } else { + unsafeValue = mvccValue.Value.RawBytes } // Skip tombstone records when start time is zero (non-incremental) // and we are not exporting all versions. @@ -8053,6 +8057,14 @@ type MVCCExportOptions struct { // FingerprintOptions controls how fingerprints are generated // when using MVCCExportFingerprint. FingerprintOptions MVCCExportFingerprintOptions + + // IncludeMVCCValueHeader controls whether we include + // MVCCValueHeaders in the exported data. When true, the + // portions of the header appropriate for export are included + // in the encoded values. Callers should be ready to decode + // full MVCCValue's in this case. + IncludeMVCCValueHeader bool + // ScanStats, if set, is updated with iterator stats upon export success of // failure. Non-iterator stats i.e., {NumGets,NumReverseScans} are left // unchanged, and NumScans is incremented by 1. diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 27bb99fad239..2b8ea7f426f4 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -134,19 +134,17 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { w.Print(v.Value.PrettyPrint()) } -// EncodeMVCCValueForExport strips fields from the MVCCValueHeader that -// should not get exported out of the cluster. -// -//gcassert:inline +// EncodeMVCCValueForExport encodes fields from the MVCCValueHeader +// that are appropriate for export out of the cluster. func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { - // Consider a fast path, where only the roachpb.Value gets exported. - // Currently, this only occurs if the value was not imported. if mvccValue.ImportEpoch == 0 { return mvccValue.Value.RawBytes, nil } - // Manually strip off any non-exportable fields, and re-encode the mvcc value. - mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{} + // We only export ImportEpoch. + mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{ + ImportEpoch: mvccValue.ImportEpoch, + } return EncodeMVCCValue(mvccValue) } @@ -234,6 +232,32 @@ func DecodeMVCCValue(buf []byte) (MVCCValue, error) { return decodeExtendedMVCCValue(buf) } +// DecodeValueFromMVCCValue decodes and MVCCValue and returns the +// roachpb.Value portion without parsing the MVCCValueHeader. +// +// NB: Caller assumes that this function does not copy or re-allocate +// the underlying byte slice. +func DecodeValueFromMVCCValue(buf []byte) (roachpb.Value, error) { + if len(buf) == 0 { + // Tombstone with no header. + return roachpb.Value{}, nil + } + if len(buf) <= tagPos { + return roachpb.Value{}, errMVCCValueMissingTag + } + if buf[tagPos] != extendedEncodingSentinel { + return roachpb.Value{RawBytes: buf}, nil + } + + // Extended encoding + headerLen := binary.BigEndian.Uint32(buf) + headerSize := extendedPreludeSize + headerLen + if len(buf) < int(headerSize) { + return roachpb.Value{}, errMVCCValueMissingHeader + } + return roachpb.Value{RawBytes: buf[headerSize:]}, nil +} + // DecodeMVCCValueAndErr is a helper that can be called using the ([]byte, // error) pair returned from the iterator UnsafeValue(), Value() methods. func DecodeMVCCValueAndErr(buf []byte, err error) (MVCCValue, error) { diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 6a2084390a3b..b5bfd6e4151a 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -211,6 +211,24 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { return buf.String() })) + t.Run("DeocdeValueFromMVCCValue/"+name, func(t *testing.T) { + enc, err := EncodeMVCCValue(tc.val) + require.NoError(t, err) + assert.Equal(t, encodedMVCCValueSize(tc.val), len(enc)) + + dec, err := DecodeValueFromMVCCValue(enc) + require.NoError(t, err) + + if len(dec.RawBytes) == 0 { + dec.RawBytes = nil // normalize + } + + require.Equal(t, tc.val.Value, dec) + require.Equal(t, tc.val.IsTombstone(), len(dec.RawBytes) == 0) + isTombstone, err := EncodedMVCCValueIsTombstone(enc) + require.NoError(t, err) + require.Equal(t, tc.val.IsTombstone(), isTombstone) + }) } } @@ -233,6 +251,14 @@ func TestDecodeMVCCValueErrors(t *testing.T) { require.Equal(t, tc.expect, err) require.False(t, isTombstone) }) + t.Run("DecodeValueFromMVCCValue/"+name, func(t *testing.T) { + dec, err := DecodeValueFromMVCCValue(tc.enc) + require.Equal(t, tc.expect, err) + require.Zero(t, dec) + isTombstone, err := EncodedMVCCValueIsTombstone(tc.enc) + require.Equal(t, tc.expect, err) + require.False(t, isTombstone) + }) } } @@ -283,6 +309,26 @@ func BenchmarkEncodeMVCCValue(b *testing.B) { } } +func BenchmarkEncodeMVCCValueForExport(b *testing.B) { + DisableMetamorphicSimpleValueEncoding(b) + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := EncodeMVCCValueForExport(mvccValue) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkDecodeMVCCValue(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers { @@ -316,6 +362,27 @@ func BenchmarkDecodeMVCCValue(b *testing.B) { } } +func BenchmarkDecodeValueFromMVCCValue(b *testing.B) { + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + buf, err := EncodeMVCCValue(mvccValue) + require.NoError(b, err) + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := DecodeValueFromMVCCValue(buf) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkMVCCValueIsTombstone(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers {