From 8878715577f4b8669da2cdaa57f6a0070b784998 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 9 Feb 2022 00:00:18 -0500 Subject: [PATCH 1/2] sql,kvserver: stop gossiping the system config There were a pair of tests that didn't feel worth carrying along. Follow up commit to remove the cluster setting. Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/helpers_tenant_shim_test.go | 4 + pkg/ccl/kvccl/kvtenantccl/connector.go | 17 +- pkg/ccl/kvccl/kvtenantccl/connector_test.go | 8 +- .../logic_test/regional_by_row_query_behavior | 2 +- .../spanconfigcomparedccl/BUILD.bazel | 2 +- .../spanconfigcomparedccl/datadriven_test.go | 33 +- pkg/cli/debug.go | 2 +- pkg/clusterversion/cockroach_versions.go | 9 +- pkg/clusterversion/key_string.go | 5 +- pkg/config/provider.go | 28 +- pkg/gossip/gossip.go | 16 +- pkg/gossip/infostore_test.go | 2 +- pkg/gossip/keys.go | 12 +- pkg/kv/kvclient/kvtenant/connector.go | 6 - pkg/kv/kvserver/BUILD.bazel | 2 - .../kvserver/batcheval/cmd_end_transaction.go | 6 +- pkg/kv/kvserver/client_lease_test.go | 39 ++- pkg/kv/kvserver/client_rangefeed_test.go | 3 - pkg/kv/kvserver/client_split_test.go | 295 ++++-------------- pkg/kv/kvserver/gossip_test.go | 37 ++- pkg/kv/kvserver/helpers_test.go | 2 +- pkg/kv/kvserver/kvserverbase/base.go | 15 +- pkg/kv/kvserver/mvcc_gc_queue_test.go | 4 +- pkg/kv/kvserver/queue_test.go | 2 +- pkg/kv/kvserver/replica_evaluate.go | 13 +- pkg/kv/kvserver/replica_gossip.go | 15 +- pkg/kv/kvserver/replica_test.go | 33 +- pkg/kv/kvserver/reports/BUILD.bazel | 2 +- pkg/kv/kvserver/reports/reporter_test.go | 59 ++-- pkg/kv/kvserver/split_queue_test.go | 2 +- pkg/kv/kvserver/store.go | 48 ++- pkg/kv/kvserver/store_test.go | 5 +- pkg/kv/txn.go | 6 +- pkg/kv/txn_test.go | 4 +- pkg/migration/migrations/descriptor_utils.go | 3 - .../migrations/descriptor_utils_test.go | 10 +- pkg/server/node.go | 116 +++---- pkg/server/server.go | 8 +- pkg/server/server_test.go | 32 +- pkg/server/status_test.go | 4 +- pkg/server/systemconfigwatcher/cache.go | 46 ++- .../test_system_config_watcher.go | 2 +- pkg/server/testserver.go | 10 + pkg/sql/catalog/descs/txn.go | 10 +- pkg/sql/catalog/lease/helpers_test.go | 7 +- pkg/sql/conn_executor_internal_test.go | 13 +- pkg/sql/crdb_internal_test.go | 3 - pkg/sql/drop_test.go | 3 - pkg/sql/gcjob/BUILD.bazel | 1 + pkg/sql/gcjob/descriptor_utils.go | 8 +- pkg/sql/gcjob/gcjobnotifier/notifier.go | 4 +- pkg/sql/gcjob/gcjobnotifier/notifier_test.go | 4 +- pkg/sql/opt/exec/execbuilder/BUILD.bazel | 1 + pkg/sql/opt/exec/execbuilder/relational.go | 8 +- .../testdata/show_trace_nonmetamorphic | 6 + pkg/sql/plan.go | 8 +- pkg/sql/schema_changer.go | 7 +- pkg/sql/schema_changer_test.go | 7 +- pkg/sql/set_cluster_setting.go | 12 +- pkg/sql/tests/BUILD.bazel | 1 + pkg/sql/tests/end_txn_trigger.go | 9 +- pkg/sql/zone_config_test.go | 19 +- pkg/startupmigrations/BUILD.bazel | 1 - pkg/startupmigrations/migrations_test.go | 3 +- pkg/testutils/localtestcluster/BUILD.bazel | 1 + .../localtestcluster/local_test_cluster.go | 7 + pkg/testutils/serverutils/BUILD.bazel | 1 + pkg/testutils/serverutils/test_tenant_shim.go | 4 + 71 files changed, 592 insertions(+), 530 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 247d149019a1..f52a039e3c52 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-64 set the active cluster version in the format '.' +version version 21.2-66 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index f66b257b25f3..c4e1bc36ceb0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -188,6 +188,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-64set the active cluster version in the format '.' +versionversion21.2-66set the active cluster version in the format '.' diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index e6cecb3b7a72..23806ec37a01 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -162,6 +162,7 @@ go_test( "//pkg/cloud", "//pkg/cloud/impl:cloudimpl", "//pkg/clusterversion", + "//pkg/config", "//pkg/gossip", "//pkg/jobs", "//pkg/jobs/jobspb", diff --git a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go index 9c2830f96cfa..b7bec84525e4 100644 --- a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go +++ b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go @@ -12,6 +12,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -115,3 +116,6 @@ func (t *testServerShim) Engines() []storage.Engine { panic(unsup func (t *testServerShim) MetricsRecorder() *status.MetricsRecorder { panic(unsupportedShimMethod) } func (t *testServerShim) CollectionFactory() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) SpanConfigKVSubscriber() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SystemConfigProvider() config.SystemConfigProvider { + panic(unsupportedShimMethod) +} diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 4dfb461f1d17..6faff2a48a82 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -77,7 +77,7 @@ type Connector struct { client *client nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor systemConfig *config.SystemConfig - systemConfigChannels []chan<- struct{} + systemConfigChannels map[chan<- struct{}]struct{} } settingsMu struct { @@ -140,6 +140,7 @@ func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector { } c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) + c.mu.systemConfigChannels = make(map[chan<- struct{}]struct{}) c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue) c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue) return c @@ -250,7 +251,7 @@ var gossipSubsHandlers = map[string]func(*Connector, context.Context, string, ro // Subscribe to all *NodeDescriptor updates. gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Connector).updateNodeAddress, // Subscribe to a filtered view of *SystemConfig updates. - gossip.KeySystemConfig: (*Connector).updateSystemConfig, + gossip.KeyDeprecatedSystemConfig: (*Connector).updateSystemConfig, } var gossipSubsPatterns = func() []string { @@ -322,7 +323,7 @@ func (c *Connector) updateSystemConfig(ctx context.Context, key string, content c.mu.Lock() defer c.mu.Unlock() c.mu.systemConfig = cfg - for _, c := range c.mu.systemConfigChannels { + for c := range c.mu.systemConfigChannels { select { case c <- struct{}{}: default: @@ -342,20 +343,24 @@ func (c *Connector) GetSystemConfig() *config.SystemConfig { // RegisterSystemConfigChannel implements the config.SystemConfigProvider // interface. -func (c *Connector) RegisterSystemConfigChannel() <-chan struct{} { +func (c *Connector) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) { // Create channel that receives new system config notifications. The channel // has a size of 1 to prevent connector from having to block on it. ch := make(chan struct{}, 1) c.mu.Lock() defer c.mu.Unlock() - c.mu.systemConfigChannels = append(c.mu.systemConfigChannels, ch) + c.mu.systemConfigChannels[ch] = struct{}{} // Notify the channel right away if we have a config. if c.mu.systemConfig != nil { ch <- struct{}{} } - return ch + return ch, func() { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.mu.systemConfigChannels, ch) + } } // RangeLookup implements the kvcoord.RangeDescriptorDB interface. diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 1135c420f23d..1b692d6ecdb9 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -140,9 +140,9 @@ func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.Gossip panic(err) } return &roachpb.GossipSubscriptionEvent{ - Key: gossip.KeySystemConfig, + Key: gossip.KeyDeprecatedSystemConfig, Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}), - PatternMatched: gossip.KeySystemConfig, + PatternMatched: gossip.KeyDeprecatedSystemConfig, } } @@ -252,7 +252,7 @@ func TestConnectorGossipSubscription(t *testing.T) { // Test config.SystemConfigProvider impl. Should not have a SystemConfig yet. sysCfg := c.GetSystemConfig() require.Nil(t, sysCfg) - sysCfgC := c.RegisterSystemConfigChannel() + sysCfgC, _ := c.RegisterSystemConfigChannel() require.Len(t, sysCfgC, 0) // Return first SystemConfig response. @@ -282,7 +282,7 @@ func TestConnectorGossipSubscription(t *testing.T) { require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values) // A newly registered SystemConfig channel will be immediately notified. - sysCfgC2 := c.RegisterSystemConfigChannel() + sysCfgC2, _ := c.RegisterSystemConfigChannel() require.Len(t, sysCfgC2, 1) } diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior index f42af1411b8f..fcfb0030d2ce 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior @@ -2234,7 +2234,7 @@ ALTER TABLE t65064 INJECT STATISTICS '[ } ]'; -query T +query T retry SELECT * FROM [EXPLAIN SELECT * FROM t65064 WHERE username = 'kharris'] OFFSET 2 ---- ยท diff --git a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel index 9e07d775d852..277f58640a79 100644 --- a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel @@ -11,12 +11,12 @@ go_test( "//pkg/base", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/utilccl", - "//pkg/gossip", "//pkg/jobs", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/server/systemconfigwatcher", "//pkg/spanconfig", "//pkg/spanconfig/spanconfigtestutils", "//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster", diff --git a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go index ca11df504192..f5e48e2dfe60 100644 --- a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go @@ -20,9 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster" @@ -106,14 +106,15 @@ func TestDataDriven(t *testing.T) { { tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) } spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */) defer spanConfigTestCluster.Cleanup() kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber) - underlyingGossip := tc.Server(0).GossipI().(*gossip.Gossip) + systemConfig := tc.Server(0).SystemConfigProvider().(*systemconfigwatcher.Cache) systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -161,15 +162,19 @@ func TestDataDriven(t *testing.T) { // (i) reconciliation processes; // (ii) tenant initializations (where seed span configs are // installed). - now := systemTenant.Clock().Now() - testutils.SucceedsSoon(t, func() error { - lastUpdated := kvSubscriber.LastUpdated() - if lastUpdated.Less(now) { - return errors.Newf("kvsubscriber last updated timestamp (%s) lagging barrier timestamp (%s)", - lastUpdated.GoTime(), now.GoTime()) - } - return nil - }) + checkLastUpdated := func(t *testing.T, n string, c interface{ LastUpdated() hlc.Timestamp }) { + now := systemTenant.Clock().Now() + testutils.SucceedsSoon(t, func() error { + lastUpdated := c.LastUpdated() + if lastUpdated.Less(now) { + return errors.Newf("%s last updated timestamp (%s) lagging barrier timestamp (%s)", + n, lastUpdated.GoTime(), now.GoTime()) + } + return nil + }) + } + checkLastUpdated(t, "kvsubscriber", kvSubscriber) + checkLastUpdated(t, "systemconfigwatcher", systemConfig) // As for the gossiped system config span, because we're using a // single node cluster there's no additional timestamp @@ -221,7 +226,7 @@ func TestDataDriven(t *testing.T) { var reader spanconfig.StoreReader if version == "legacy" { - reader = underlyingGossip.GetSystemConfig() + reader = systemConfig.GetSystemConfig() } else { reader = kvSubscriber } @@ -230,7 +235,7 @@ func TestDataDriven(t *testing.T) { return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", data) case "diff": - var before, after spanconfig.StoreReader = underlyingGossip.GetSystemConfig(), kvSubscriber + var before, after spanconfig.StoreReader = systemConfig.GetSystemConfig(), kvSubscriber diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{ A: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, before).String()), B: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, after).String()), diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 838cd025648a..9e9c74f52ea3 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -904,7 +904,7 @@ func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) { return "", errors.Wrapf(err, "failed to parse value for key %q", key) } output = append(output, fmt.Sprintf("%q: %v", key, clusterID)) - } else if key == gossip.KeySystemConfig { + } else if key == gossip.KeyDeprecatedSystemConfig { if debugCtx.printSystemConfig { var config config.SystemConfigEntries if err := protoutil.Unmarshal(bytes, &config); err != nil { diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index aa9d306c8956..bb22e5b3b23f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -279,7 +279,10 @@ const ( // engine running at the required format major version, as do all other nodes // in the cluster. EnablePebbleFormatVersionBlockProperties - + // DisableSystemConfigGossipTrigger is a follow-up to EnableSpanConfigStore + // to disable the data propagation mechanism it and the entire spanconfig + // infrastructure obviates. + DisableSystemConfigGossipTrigger // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -443,6 +446,10 @@ var versionsSingleton = keyedVersions{ Key: EnablePebbleFormatVersionBlockProperties, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64}, }, + { + Key: DisableSystemConfigGossipTrigger, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 66}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 1e967f9eb977..b48bc5a4f3d7 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -41,11 +41,12 @@ func _() { _ = x[DontProposeWriteTimestampForLeaseTransfers-30] _ = x[TenantSettingsTable-31] _ = x[EnablePebbleFormatVersionBlockProperties-32] + _ = x[DisableSystemConfigGossipTrigger-33] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockProperties" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTrigger" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/config/provider.go b/pkg/config/provider.go index 774ac3bdcfda..48dedc81a684 100644 --- a/pkg/config/provider.go +++ b/pkg/config/provider.go @@ -21,20 +21,32 @@ type SystemConfigProvider interface { // the system config. It is notified after registration (if a system config // is already set), and whenever a new system config is successfully // unmarshaled. - RegisterSystemConfigChannel() <-chan struct{} + RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) } -// EmptySystemConfigProvider is an implementation of SystemConfigProvider that -// never provides a system config. -type EmptySystemConfigProvider struct{} +// ConstantSystemConfigProvider is an implementation of SystemConfigProvider which +// always returns the same value. +type ConstantSystemConfigProvider struct { + cfg *SystemConfig +} + +// NewConstantSystemConfigProvider constructs a SystemConfigProvider which +// always returns the same value. +func NewConstantSystemConfigProvider(cfg *SystemConfig) *ConstantSystemConfigProvider { + p := &ConstantSystemConfigProvider{cfg: cfg} + return p +} // GetSystemConfig implements the SystemConfigProvider interface. -func (EmptySystemConfigProvider) GetSystemConfig() *SystemConfig { - return nil +func (c *ConstantSystemConfigProvider) GetSystemConfig() *SystemConfig { + return c.cfg } // RegisterSystemConfigChannel implements the SystemConfigProvider interface. -func (EmptySystemConfigProvider) RegisterSystemConfigChannel() <-chan struct{} { +func (c *ConstantSystemConfigProvider) RegisterSystemConfigChannel() ( + _ <-chan struct{}, + unregister func(), +) { // The system config will never be updated, so return a nil channel. - return nil + return nil, func() {} } diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index 492b4b434183..97c7f117d8c0 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -329,7 +329,7 @@ func New( g.mu.Lock() // Add ourselves as a SystemConfig watcher. - g.mu.is.registerCallback(KeySystemConfig, g.updateSystemConfig) + g.mu.is.registerCallback(KeyDeprecatedSystemConfig, g.updateSystemConfig) // Add ourselves as a node descriptor watcher. g.mu.is.registerCallback(MakePrefixPattern(KeyNodeIDPrefix), g.updateNodeAddress) g.mu.is.registerCallback(MakePrefixPattern(KeyStorePrefix), g.updateStoreMap) @@ -1145,18 +1145,22 @@ func (g *Gossip) RegisterCallback(pattern string, method Callback, opts ...Callb } } -// GetSystemConfig returns the local unmarshaled version of the system config. +// DeprecatedGetSystemConfig returns the local unmarshaled version of the system config. // Returns nil if the system config hasn't been set yet. -func (g *Gossip) GetSystemConfig() *config.SystemConfig { +// +// TODO(ajwerner): Remove this in 22.2. +func (g *Gossip) DeprecatedGetSystemConfig() *config.SystemConfig { g.systemConfigMu.RLock() defer g.systemConfigMu.RUnlock() return g.systemConfig } -// RegisterSystemConfigChannel registers a channel to signify updates for the +// DeprecatedRegisterSystemConfigChannel registers a channel to signify updates for the // system config. It is notified after registration (if a system config is // already set), and whenever a new system config is successfully unmarshaled. -func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} { +// +// TODO(ajwerner): Remove this in 22.2. +func (g *Gossip) DeprecatedRegisterSystemConfigChannel() <-chan struct{} { // Create channel that receives new system config notifications. // The channel has a size of 1 to prevent gossip from having to block on it. c := make(chan struct{}, 1) @@ -1177,7 +1181,7 @@ func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} { // channel. func (g *Gossip) updateSystemConfig(key string, content roachpb.Value) { ctx := g.AnnotateCtx(context.TODO()) - if key != KeySystemConfig { + if key != KeyDeprecatedSystemConfig { log.Fatalf(ctx, "wrong key received on SystemConfig callback: %s", key) } cfg := config.NewSystemConfig(g.defaultZoneConfig) diff --git a/pkg/gossip/infostore_test.go b/pkg/gossip/infostore_test.go index 3bac80041949..a682cb0e9234 100644 --- a/pkg/gossip/infostore_test.go +++ b/pkg/gossip/infostore_test.go @@ -321,7 +321,7 @@ func TestInfoStoreMostDistant(t *testing.T) { scInfo := is.newInfo(nil, time.Second) scInfo.Hops = 100 scInfo.NodeID = nodes[0] - if err := is.addInfo(KeySystemConfig, scInfo); err != nil { + if err := is.addInfo(KeyDeprecatedSystemConfig, scInfo); err != nil { t.Fatal(err) } diff --git a/pkg/gossip/keys.go b/pkg/gossip/keys.go index ac0d6fc82979..337f5807ddea 100644 --- a/pkg/gossip/keys.go +++ b/pkg/gossip/keys.go @@ -63,10 +63,18 @@ const ( // bi-level key addressing scheme. The value is a roachpb.RangeDescriptor. KeyFirstRangeDescriptor = "first-range" - // KeySystemConfig is the gossip key for the system DB span. + // KeyDeprecatedSystemConfig is the gossip key for the system DB span. // The value if a config.SystemConfig which holds all key/value // pairs in the system DB span. - KeySystemConfig = "system-db" + // + // This key is used in the 21.2<->22.1 mixed version state. It is not used + // in 22.1. However, it was written without a TTL, so there no guarantee + // that it will actually be removed from the gossip network. + // + // TODO(ajwerner): Write a migration to remove the data, or release a + // a version which drops the key entirely, and then, in a subsequent + // release, delete this key. + KeyDeprecatedSystemConfig = "system-db" // KeyDistSQLNodeVersionKeyPrefix is key prefix for each node's DistSQL // version. diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index 84180bf5cb52..71756dea4c67 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -56,12 +56,6 @@ type Connector interface { // (e.g. is the Range being requested owned by the requesting tenant?). rangecache.RangeDescriptorDB - // SystemConfigProvider provides a filtered view of the SystemConfig - // containing only information applicable to secondary tenants. This - // obviates the need for SQL-only tenant processes to join the cluster-wide - // gossip network. - config.SystemConfigProvider - // RegionsServer provides access to a tenant's available regions. This is // necessary for region validation for zone configurations and multi-region // primitives. diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 15ff5dda957c..67cb86bbbdaf 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -358,13 +358,11 @@ go_test( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigstore", "//pkg/sql", - "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/dbdesc", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", - "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc/keyside", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 8b3202395d39..bc52ee111f09 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -428,7 +429,10 @@ func EndTxn( // potentially gossip now that we've removed an intent. This is important // to deal with cases where previously committed values were not gossipped // due to an outstanding intent. - if cArgs.EvalCtx.ContainsKey(keys.SystemConfigSpan.Key) { + if cArgs.EvalCtx.ContainsKey(keys.SystemConfigSpan.Key) && + !cArgs.EvalCtx.ClusterSettings().Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { txnResult.Local.MaybeGossipSystemConfigIfHaveFailure = true } } diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 3c5de70ce569..64ada8ab54e1 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -96,19 +97,28 @@ func TestStoreRangeLease(t *testing.T) { } // TestStoreGossipSystemData verifies that the system-config and node-liveness -// data is gossiped at startup. +// data is gossiped at startup in the mixed version state. +// +// TODO(ajwerner): Delete this test in 22.2. func TestStoreGossipSystemData(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) zcfg := zonepb.DefaultZoneConfig() + version := clusterversion.ByKey(clusterversion.DisableSystemConfigGossipTrigger - 1) + settings := cluster.MakeTestingClusterSettingsWithVersions( + version, version, false, /* initializeVersion */ + ) serverArgs := base.TestServerArgs{ + Settings: settings, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, }, Server: &server.TestingKnobs{ - DefaultZoneConfigOverride: &zcfg, + DefaultZoneConfigOverride: &zcfg, + BinaryVersionOverride: version, + DisableAutomaticVersionUpgrade: make(chan struct{}), }, }, } @@ -128,7 +138,7 @@ func TestStoreGossipSystemData(t *testing.T) { } getSystemConfig := func(s *kvserver.Store) *config.SystemConfig { - systemConfig := s.Gossip().GetSystemConfig() + systemConfig := s.Gossip().DeprecatedGetSystemConfig() return systemConfig } getNodeLiveness := func(s *kvserver.Store) livenesspb.Liveness { @@ -159,7 +169,9 @@ func TestStoreGossipSystemData(t *testing.T) { // re-gossiped on lease transfer even if it hasn't changed. This helps prevent // situations where a previous leaseholder can restart and not receive the // system config because it was the original source of it within the gossip -// network. +// network. This test only applies in the mixed version state. +// +// TODO(ajwerner): Remove this test in 22.2. func TestGossipSystemConfigOnLeaseChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -168,6 +180,19 @@ func TestGossipSystemConfigOnLeaseChange(t *testing.T) { tc := testcluster.StartTestCluster(t, numStores, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + Server: &server.TestingKnobs{ + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.DisableSystemConfigGossipTrigger - 1, + ), + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + }, }, ) defer tc.Stopper().Stop(context.Background()) @@ -177,7 +202,7 @@ func TestGossipSystemConfigOnLeaseChange(t *testing.T) { initialStoreIdx := -1 for i := range tc.Servers { - if tc.GetFirstStoreFromServer(t, i).Gossip().InfoOriginatedHere(gossip.KeySystemConfig) { + if tc.GetFirstStoreFromServer(t, i).Gossip().InfoOriginatedHere(gossip.KeyDeprecatedSystemConfig) { initialStoreIdx = i } } @@ -190,10 +215,10 @@ func TestGossipSystemConfigOnLeaseChange(t *testing.T) { t.Fatalf("Unexpected error %v", err) } testutils.SucceedsSoon(t, func() error { - if tc.GetFirstStoreFromServer(t, initialStoreIdx).Gossip().InfoOriginatedHere(gossip.KeySystemConfig) { + if tc.GetFirstStoreFromServer(t, initialStoreIdx).Gossip().InfoOriginatedHere(gossip.KeyDeprecatedSystemConfig) { return errors.New("system config still most recently gossiped by original leaseholder") } - if !tc.GetFirstStoreFromServer(t, newStoreIdx).Gossip().InfoOriginatedHere(gossip.KeySystemConfig) { + if !tc.GetFirstStoreFromServer(t, newStoreIdx).Gossip().InfoOriginatedHere(gossip.KeyDeprecatedSystemConfig) { return errors.New("system config not most recently gossiped by new leaseholder") } return nil diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index 6632d579b0df..d521e5f25e12 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -85,9 +85,6 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { junkDescriptor := dbdesc.NewInitial( junkDescriptorID, "junk", security.AdminRoleName()) require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } return txn.Put(ctx, junkDescriptorKey, junkDescriptor.DescriptorProto()) })) after := db.Clock().Now() diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 5ffcc503089d..1847bf028ae5 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -38,17 +38,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" @@ -252,72 +249,6 @@ func TestStoreSplitAbortSpan(t *testing.T) { } } -// TestStoreRangeSplitAtTablePrefix verifies a range can be split at -// UserTableDataMin and still gossip the SystemConfig properly. -func TestStoreRangeSplitAtTablePrefix(t *testing.T) { - defer leaktest.AfterTest(t)() - skip.WithIssue(t, 59091, "flaky test") - defer log.Scope(t).Close(t) - - ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - DisableMergeQueue: true, - DisableSplitQueue: true, - }, - }, - }) - s := serv.(*server.TestServer) - defer s.Stopper().Stop(ctx) - store, err := s.Stores().GetStore(s.GetFirstStoreID()) - require.NoError(t, err) - - key := bootstrap.TestingUserTableDataMin() - args := adminSplitArgs(key) - if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { - t.Fatalf("%q: split unexpected error: %s", key, pErr) - } - - var desc descpb.Descriptor - descBytes, err := protoutil.Marshal(&desc) - if err != nil { - t.Fatal(err) - } - - // Update SystemConfig to trigger gossip. - if err := store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } - // We don't care about the values, just the keys. - k := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descpb.ID(bootstrap.TestingUserDescID(0))) - return txn.Put(ctx, k, &desc) - }); err != nil { - t.Fatal(err) - } - - successChan := make(chan struct{}, 1) - store.Gossip().RegisterCallback(gossip.KeySystemConfig, func(_ string, content roachpb.Value) { - contentBytes, err := content.GetBytes() - if err != nil { - t.Fatal(err) - } - if bytes.Contains(contentBytes, descBytes) { - select { - case successChan <- struct{}{}: - default: - } - } - }) - - select { - case <-time.After(time.Second): - t.Errorf("expected a schema gossip containing %q, but did not see one", descBytes) - case <-successChan: - } -} - // TestStoreRangeSplitInsideRow verifies an attempt to split a range inside of // a table row will cause a split at a boundary between rows. func TestStoreRangeSplitInsideRow(t *testing.T) { @@ -1017,9 +948,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // This test was written with the SystemConfigSpan in mind. - DisableSpanConfigs: true, + serv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, @@ -1030,22 +959,18 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { defer s.Stopper().Stop(ctx) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) - config.TestingSetupZoneConfigHook(s.Stopper()) - const maxBytes = 1 << 16 - // Set max bytes. - descID := bootstrap.TestingUserDescID(0) - zoneConfig := zonepb.DefaultZoneConfig() - zoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - config.TestingSetZoneConfig(config.SystemTenantObjectID(descID), zoneConfig) - - // Trigger gossip callback. - if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfigEntries{}, 0); err != nil { - t.Fatal(err) - } + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) + tdb.Exec(t, "CREATE TABLE t ()") + var descID uint32 + tdb.QueryRow(t, "SELECT 't'::regclass::int").Scan(&descID) + const maxBytes, minBytes = 1 << 16, 1 << 14 + tdb.Exec(t, "ALTER TABLE t CONFIGURE ZONE USING range_max_bytes = $1, range_min_bytes = $2", + maxBytes, minBytes) tableBoundary := keys.SystemSQLCodec.TablePrefix(descID) - { var repl *kvserver.Replica @@ -1056,14 +981,13 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { if actualRSpan := repl.Desc().RSpan(); !actualRSpan.Equal(expectedRSpan) { return errors.Errorf("expected range %s to span %s", repl, expectedRSpan) } + // Check range's max bytes settings. + if actualMaxBytes := repl.GetMaxBytes(); actualMaxBytes != maxBytes { + return errors.Errorf("range %s max bytes mismatch, got: %d, expected: %d", repl, actualMaxBytes, maxBytes) + } return nil }) - // Check range's max bytes settings. - if actualMaxBytes := repl.GetMaxBytes(); actualMaxBytes != maxBytes { - t.Fatalf("range %s max bytes mismatch, got: %d, expected: %d", repl, actualMaxBytes, maxBytes) - } - // Look in the range after prefix we're writing to. fillRange(t, store, repl.RangeID, tableBoundary, maxBytes, false /* singleKey */) } @@ -1088,9 +1012,7 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // This test was written with the system config span in mind. - DisableSpanConfigs: true, + serv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, @@ -1101,25 +1023,35 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { defer s.Stopper().Stop(ctx) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) - config.TestingSetupZoneConfigHook(s.Stopper()) - origRng := store.LookupReplica(roachpb.RKeyMin) - - // Set max bytes. - const maxBytes = 1 << 16 - descID := bootstrap.TestingUserDescID(0) - zoneConfig := zonepb.DefaultZoneConfig() - zoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - config.TestingSetZoneConfig(config.SystemTenantObjectID(descID), zoneConfig) + // Find the last range. + var max roachpb.RKey + var origRng *roachpb.RangeDescriptor + store.VisitReplicas(func(replica *kvserver.Replica) (wantMore bool) { + if rd := replica.Desc(); max == nil || max.Less(rd.StartKey) { + origRng = rd + max = rd.StartKey + } + return true + }) - // Trigger gossip callback. - if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfigEntries{}, 0); err != nil { - t.Fatal(err) - } + // Create a new table and configure its size. + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) + tdb.Exec(t, "CREATE TABLE t ()") + var descID uint32 + tdb.QueryRow(t, "SELECT 't'::regclass::int").Scan(&descID) + const maxBytes, minBytes = 1 << 16, 1 << 14 + tdb.Exec(t, "ALTER TABLE t CONFIGURE ZONE USING range_max_bytes = $1, range_min_bytes = $2", + maxBytes, minBytes) // Verify that the range is split and the new range has the correct max bytes. testutils.SucceedsSoon(t, func() error { newRng := store.LookupReplica(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(descID))) + if newRng == nil { + return errors.Errorf("expected new range created by split") + } if newRng.RangeID == origRng.RangeID { return errors.Errorf("expected new range created by split") } @@ -1311,132 +1243,6 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { } } -// TestStoreRangeSystemSplits verifies that splits are based on the contents of -// the system.descriptor table. -func TestStoreRangeSystemSplits(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - // Intentionally leave the merge queue enabled. This indirectly tests that the - // merge queue respects these split points. - ctx := context.Background() - s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // This test was written with the system config span in mind. - DisableSpanConfigs: true, - }) - defer s.Stopper().Stop(ctx) - - userTableMax := bootstrap.TestingUserDescID(4) - var exceptions map[int]struct{} - schema := bootstrap.MakeMetadataSchema( - keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ) - // Write table descriptors for the tables in the metadata schema as well as - // five dummy user tables. This does two things: - // - descriptor IDs are used to determine split keys - // - the write triggers a SystemConfig update and gossip - // We should end up with splits at each user table prefix. - if err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } - descTablePrefix := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID) - kvs, _ /* splits */ := schema.GetInitialValues() - for _, akv := range kvs { - if !bytes.HasPrefix(akv.Key, descTablePrefix) { - continue - } - if err := txn.Put(ctx, akv.Key, &akv.Value); err != nil { - return err - } - } - for i := bootstrap.TestingUserDescID(0); i <= userTableMax; i++ { - // We don't care about the value, just the key. - id := descpb.ID(i) - key := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, id) - desc := tabledesc.NewBuilder(&descpb.TableDescriptor{ID: id}).BuildImmutable() - if err := txn.Put(ctx, key, desc.DescriptorProto()); err != nil { - return err - } - } - return nil - }); err != nil { - t.Fatal(err) - } - - verifySplitsAtTablePrefixes := func() { - t.Helper() - // We expect splits at each of the user tables and at a few fixed system - // range boundaries, but not at system config table boundaries. - expKeys := []roachpb.Key{ - testutils.MakeKey(keys.Meta2Prefix, keys.NodeLivenessPrefix), - testutils.MakeKey(keys.Meta2Prefix, keys.NodeLivenessKeyMax), - testutils.MakeKey(keys.Meta2Prefix, keys.TimeseriesPrefix), - testutils.MakeKey(keys.Meta2Prefix, keys.TimeseriesPrefix.PrefixEnd()), - testutils.MakeKey(keys.Meta2Prefix, keys.TableDataMin), - } - ids := catalog.MakeDescriptorIDSet(schema.DescriptorIDs()...) - - for i := 0; i < keys.MaxSystemConfigDescID; i++ { - ids.Remove(descpb.ID(i)) - } - // We sadly do split on pseudo table ID. - for _, id := range keys.PseudoTableIDs { - ids.Add(descpb.ID(id)) - } - ids.ForEach(func(id descpb.ID) { - expKeys = append(expKeys, - testutils.MakeKey( - keys.Meta2Prefix, keys.SystemSQLCodec.TablePrefix(uint32(id)), - ), - ) - }) - for i := bootstrap.TestingUserDescID(0); i <= userTableMax; i++ { - if _, ok := exceptions[int(i)]; !ok { - expKeys = append(expKeys, - testutils.MakeKey(keys.Meta2Prefix, keys.SystemSQLCodec.TablePrefix(i)), - ) - } - } - expKeys = append(expKeys, testutils.MakeKey(keys.Meta2Prefix, roachpb.RKeyMax)) - - testutils.SucceedsSoon(t, func() error { - rows, err := s.DB().Scan(context.Background(), keys.Meta2Prefix, keys.MetaMax, 0) - if err != nil { - return err - } - keys := make([]roachpb.Key, 0, len(expKeys)) - for _, r := range rows { - keys = append(keys, r.Key) - } - if !reflect.DeepEqual(keys, expKeys) { - return errors.Errorf("expected split keys:\n%v\nbut found:\n%v", expKeys, keys) - } - return nil - }) - } - - verifySplitsAtTablePrefixes() - - // Write another, disjoint (+3) descriptor for a user table. - userTableMax += 3 - exceptions = map[int]struct{}{int(userTableMax) - 1: {}, int(userTableMax) - 2: {}} - if err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } - // This time, only write the last table descriptor. Splits only occur for - // the descriptor we add. We don't care about the value, just the key. - id := descpb.ID(userTableMax) - k := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, id) - desc := tabledesc.NewBuilder(&descpb.TableDescriptor{ID: id}).BuildImmutable() - return txn.Put(ctx, k, desc.DescriptorProto()) - }); err != nil { - t.Fatal(err) - } - - verifySplitsAtTablePrefixes() -} - // runSetupSplitSnapshotRace engineers a situation in which a range has // been split but node 3 hasn't processed it yet. There is a race // depending on whether node 3 learns of the split from its left or @@ -3574,7 +3380,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { } ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + serv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ DisableSpanConfigs: true, Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ @@ -3585,20 +3391,31 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { }) s := serv.(*server.TestServer) defer s.Stopper().Stop(ctx) + // Set the closed_timestamp interval to be short to shorten the test duration + // because we need to wait for a checkpoint on the system config. + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) clock.Store(s.Clock()) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) config.TestingSetupZoneConfigHook(s.Stopper()) - // Set global reads. + // Split off the range for the test. descID := bootstrap.TestingUserDescID(0) descKey := keys.SystemSQLCodec.TablePrefix(descID) + splitArgs := adminSplitArgs(descKey) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs) + require.Nil(t, pErr) + + // Set global reads. zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.GlobalReads = proto.Bool(true) config.TestingSetZoneConfig(config.SystemTenantObjectID(descID), zoneConfig) - // Trigger gossip callback and wait for propagation - require.NoError(t, store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfigEntries{}, 0)) + // Perform a write to the system config span being watched by + // the SystemConfigProvider. + tdb.Exec(t, "CREATE TABLE foo ()") testutils.SucceedsSoon(t, func() error { repl := store.LookupReplica(roachpb.RKey(descKey)) if repl.ClosedTimestampPolicy() != roachpb.LEAD_FOR_GLOBAL_READS { @@ -3609,12 +3426,12 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { // Write to the range, which has the effect of bumping the closed timestamp. pArgs := putArgs(descKey, []byte("foo")) - _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs) + _, pErr = kv.SendWrapped(ctx, store.TestSender(), pArgs) require.Nil(t, pErr) // Split the range. Should succeed. splitKey := append(descKey, []byte("split")...) - splitArgs := adminSplitArgs(splitKey) + splitArgs = adminSplitArgs(splitKey) _, pErr = kv.SendWrapped(ctx, store.TestSender(), splitArgs) require.Nil(t, pErr) require.Equal(t, int64(1), store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) diff --git a/pkg/kv/kvserver/gossip_test.go b/pkg/kv/kvserver/gossip_test.go index 5833410fb2f4..fa7b8beaa921 100644 --- a/pkg/kv/kvserver/gossip_test.go +++ b/pkg/kv/kvserver/gossip_test.go @@ -18,10 +18,14 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -200,13 +204,36 @@ func TestGossipHandlesReplacedNode(t *testing.T) { // TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents tests // that failures to gossip the system config due to intents are rectified when // later intents are aborted. +// +// Note that this tests the gossip functionality only in the mixed version +// state. After the release is finalized, these gossip triggers will no longer +// happen. +// +// TODO(ajwerner): Delete this test in 22.2. func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryMinSupportedVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, + ) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + Server: &server.TestingKnobs{ + BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + }, + }) defer tc.Stopper().Stop(ctx) require.NoError(t, tc.WaitForFullReplication()) @@ -215,13 +242,13 @@ func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *te txA := db.NewTxn(ctx, "a") txB := db.NewTxn(ctx, "b") - require.NoError(t, txA.SetSystemConfigTrigger(true /* forSystemTenant */)) + require.NoError(t, txA.DeprecatedSetSystemConfigTrigger(true /* forSystemTenant */)) db1000 := dbdesc.NewInitial(1000, "1000", security.AdminRoleName()) require.NoError(t, txA.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(1000), db1000.DescriptorProto())) - require.NoError(t, txB.SetSystemConfigTrigger(true /* forSystemTenant */)) + require.NoError(t, txB.DeprecatedSetSystemConfigTrigger(true /* forSystemTenant */)) db2000 := dbdesc.NewInitial(2000, "2000", security.AdminRoleName()) require.NoError(t, txB.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(2000), @@ -237,7 +264,7 @@ func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *te } } } - systemConfChangeCh := tc.Server(0).GossipI().(*gossip.Gossip).RegisterSystemConfigChannel() + systemConfChangeCh := tc.Server(0).GossipI().(*gossip.Gossip).DeprecatedRegisterSystemConfigChannel() clearNotifictions(systemConfChangeCh) require.NoError(t, txB.Commit(ctx)) select { diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 8e535468efb4..a6e92183026d 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -171,7 +171,7 @@ func (s *Store) EnqueueRaftUpdateCheck(rangeID roachpb.RangeID) { } func manualQueue(s *Store, q queueImpl, repl *Replica) error { - cfg := s.Gossip().GetSystemConfig() + cfg := s.cfg.SystemConfigProvider.GetSystemConfig() if cfg == nil { return fmt.Errorf("%s: system config not yet available", s) } diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index c788b093e06d..7c71cde3df22 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -59,13 +59,14 @@ var _ redact.SafeFormatter = CmdIDKey("") // FilterArgs groups the arguments to a ReplicaCommandFilter. type FilterArgs struct { - Ctx context.Context - CmdID CmdIDKey - Index int - Sid roachpb.StoreID - Req roachpb.Request - Hdr roachpb.Header - Err error // only used for TestingPostEvalFilter + Ctx context.Context + CmdID CmdIDKey + Index int + Sid roachpb.StoreID + Req roachpb.Request + Hdr roachpb.Header + Version roachpb.Version + Err error // only used for TestingPostEvalFilter } // ProposalFilterArgs groups the arguments to ReplicaProposalFilter. diff --git a/pkg/kv/kvserver/mvcc_gc_queue_test.go b/pkg/kv/kvserver/mvcc_gc_queue_test.go index cebee5decc89..9ae102908a26 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue_test.go +++ b/pkg/kv/kvserver/mvcc_gc_queue_test.go @@ -582,7 +582,7 @@ func TestMVCCGCQueueProcess(t *testing.T) { } } - cfg := tc.gossip.GetSystemConfig() + cfg := tc.gossip.DeprecatedGetSystemConfig() if cfg == nil { t.Fatal("config not set") } @@ -867,7 +867,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) { // Run GC. mgcq := newMVCCGCQueue(tc.store) - cfg := tc.gossip.GetSystemConfig() + cfg := tc.gossip.DeprecatedGetSystemConfig() if cfg == nil { t.Fatal("config not set") } diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 3d958e91e68d..cfa3c7c409c7 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -693,7 +693,7 @@ func TestAcceptsUnsplitRanges(t *testing.T) { // Check our config. var sysCfg *config.SystemConfig testutils.SucceedsSoon(t, func() error { - sysCfg = s.cfg.Gossip.GetSystemConfig() + sysCfg = s.cfg.Gossip.DeprecatedGetSystemConfig() if sysCfg == nil { return errors.New("system config not yet present") } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 74d99887aaf0..9ce9b7ff58a9 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -245,12 +245,13 @@ func evaluateBatch( // If a unittest filter was installed, check for an injected error; otherwise, continue. if filter := rec.EvalKnobs().TestingEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ - Ctx: ctx, - CmdID: idKey, - Index: index, - Sid: rec.StoreID(), - Req: args, - Hdr: baHeader, + Ctx: ctx, + CmdID: idKey, + Index: index, + Sid: rec.StoreID(), + Req: args, + Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version, + Hdr: baHeader, } if pErr := filter(filterArgs); pErr != nil { if pErr.GetTxn() == nil { diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index ab3eba10da39..a1aeb0bbb249 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -13,6 +13,7 @@ package kvserver import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" @@ -78,7 +79,14 @@ func (r *Replica) shouldGossip(ctx context.Context) bool { // // TODO(nvanbenschoten,bdarnell): even though this is best effort, we should log // louder when we continually fail to gossip system config. +// +// TODO(ajwerner): Remove this in 22.2. func (r *Replica) MaybeGossipSystemConfigRaftMuLocked(ctx context.Context) error { + if r.ClusterSettings().Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { + return nil + } r.raftMu.AssertHeld() if r.store.Gossip() == nil { log.VEventf(ctx, 2, "not gossiping system config because gossip isn't initialized") @@ -109,8 +117,9 @@ func (r *Replica) MaybeGossipSystemConfigRaftMuLocked(ctx context.Context) error return errors.Wrap(err, "could not load SystemConfig span") } - if gossipedCfg := r.store.Gossip().GetSystemConfig(); gossipedCfg != nil && gossipedCfg.Equal(loadedCfg) && - r.store.Gossip().InfoOriginatedHere(gossip.KeySystemConfig) { + if gossipedCfg := r.store.Gossip().DeprecatedGetSystemConfig(); gossipedCfg != nil && + gossipedCfg.Equal(loadedCfg) && + r.store.Gossip().InfoOriginatedHere(gossip.KeyDeprecatedSystemConfig) { log.VEventf(ctx, 2, "not gossiping unchanged system config") // Clear the failure bit if all intents have been resolved but there's // nothing new to gossip. @@ -119,7 +128,7 @@ func (r *Replica) MaybeGossipSystemConfigRaftMuLocked(ctx context.Context) error } log.VEventf(ctx, 2, "gossiping system config") - if err := r.store.Gossip().AddInfoProto(gossip.KeySystemConfig, loadedCfg, 0); err != nil { + if err := r.store.Gossip().AddInfoProto(gossip.KeyDeprecatedSystemConfig, loadedCfg, 0); err != nil { return errors.Wrap(err, "failed to gossip system config") } r.markSystemConfigGossipSuccess() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 1238b7f7db1b..7ccea80073d5 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" @@ -49,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -251,16 +253,18 @@ func (tc *testContext) SendWrapped(args roachpb.Request) (roachpb.Response, *roa } // initConfigs creates default configuration entries. +// +// TODO(ajwerner): Remove this in 22.2. func (tc *testContext) initConfigs(t testing.TB) error { // Put an empty system config into gossip so that gossip callbacks get // run. We're using a fake config, but it's hooked into SystemConfig. - if err := tc.gossip.AddInfoProto(gossip.KeySystemConfig, + if err := tc.gossip.AddInfoProto(gossip.KeyDeprecatedSystemConfig, &config.SystemConfigEntries{}, 0); err != nil { return err } testutils.SucceedsSoon(t, func() error { - if cfg := tc.gossip.GetSystemConfig(); cfg == nil { + if cfg := tc.gossip.DeprecatedGetSystemConfig(); cfg == nil { return errors.Errorf("expected system config to be set") } return nil @@ -1150,6 +1154,8 @@ func TestReplicaLeaseCounters(t *testing.T) { // TestReplicaGossipConfigsOnLease verifies that config info is gossiped // upon acquisition of the range lease. +// +// TODO(ajwerner): Delete this test in 22.2. func TestReplicaGossipConfigsOnLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1161,7 +1167,18 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { tc := testContext{manualClock: hlc.NewManualClock(123)} cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true - tc.StartWithStoreConfig(ctx, t, stopper, cfg) + // Use the TestingBinaryMinSupportedVersion for bootstrap because we won't + // gossip the system config once the current version is finalized. + cfg.Settings = cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, + ) + require.NoError(t, cfg.Settings.Version.SetActiveVersion(ctx, clusterversion.ClusterVersion{ + Version: clusterversion.TestingBinaryMinSupportedVersion, + })) + tc.StartWithStoreConfigAndVersion(ctx, t, stopper, cfg, + clusterversion.TestingBinaryMinSupportedVersion) secondReplica, err := tc.addBogusReplicaToRangeDesc(ctx) if err != nil { @@ -1178,7 +1195,7 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { // If this actually failed, we would have gossiped from MVCCPutProto. // Unlikely, but why not check. - if cfg := tc.gossip.GetSystemConfig(); cfg != nil { + if cfg := tc.gossip.DeprecatedGetSystemConfig(); cfg != nil { if nv := len(cfg.Values); nv == 1 && cfg.Values[nv-1].Key.Equal(key) { t.Errorf("unexpected gossip of system config: %s", cfg) } @@ -1202,7 +1219,7 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { tc.manualClock.Increment(11 + int64(tc.Clock().MaxOffset())) // advance time now = tc.Clock().NowAsClockTimestamp() - ch := tc.gossip.RegisterSystemConfigChannel() + ch := tc.gossip.DeprecatedRegisterSystemConfigChannel() select { case <-ch: default: @@ -1222,7 +1239,7 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - sysCfg := tc.gossip.GetSystemConfig() + sysCfg := tc.gossip.DeprecatedGetSystemConfig() if sysCfg == nil { return errors.Errorf("no system config yet") } @@ -1544,7 +1561,7 @@ func TestReplicaGossipAllConfigs(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) - if cfg := tc.gossip.GetSystemConfig(); cfg == nil { + if cfg := tc.gossip.DeprecatedGetSystemConfig(); cfg == nil { t.Fatal("config not set") } } @@ -10176,7 +10193,7 @@ func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) { for i := 0; i < 2; i++ { // Do this twice because it used to deadlock. See #25456. - sysCfg := tc.store.Gossip().GetSystemConfig() + sysCfg := tc.store.Gossip().DeprecatedGetSystemConfig() processed, err := tc.store.consistencyQueue.process(ctx, tc.repl, sysCfg) if !testutils.IsError(err, "boom") { t.Fatal(err) diff --git a/pkg/kv/kvserver/reports/BUILD.bazel b/pkg/kv/kvserver/reports/BUILD.bazel index 8c9d531af008..d959e20ecfa1 100644 --- a/pkg/kv/kvserver/reports/BUILD.bazel +++ b/pkg/kv/kvserver/reports/BUILD.bazel @@ -53,7 +53,6 @@ go_test( "//pkg/base", "//pkg/config", "//pkg/config/zonepb", - "//pkg/gossip", "//pkg/keys", "//pkg/kv/kvserver", "//pkg/roachpb", @@ -75,6 +74,7 @@ go_test( "//pkg/testutils/keysutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/reports/reporter_test.go b/pkg/kv/kvserver/reports/reporter_test.go index dfabc0533130..b9b03d538ecb 100644 --- a/pkg/kv/kvserver/reports/reporter_test.go +++ b/pkg/kv/kvserver/reports/reporter_test.go @@ -19,14 +19,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/keysutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -59,16 +60,17 @@ func TestConstraintConformanceReportIntegration(t *testing.T) { defer tc.Stopper().Stop(ctx) db := tc.ServerConn(0) - // Speed up the generation of the - _, err := db.Exec("set cluster setting kv.replication_reports.interval = '1ms'") - require.NoError(t, err) + tdb := sqlutils.MakeSQLRunner(db) + // Speed up the generation of the reports. + tdb.Exec(t, "SET CLUSTER SETTING kv.replication_reports.interval = '1ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") // Create a table and a zone config for it. // The zone will be configured with a constraints that can't be satisfied // because there are not enough nodes in the requested region. - _, err = db.Exec("create table t(x int primary key); " + + tdb.Exec(t, "create table t(x int primary key); "+ "alter table t configure zone using constraints='[+region=r1]'") - require.NoError(t, err) // Get the id of the newly created zone. r := db.QueryRow("select zone_id from crdb_internal.zones where table_name = 't'") @@ -91,8 +93,7 @@ func TestConstraintConformanceReportIntegration(t *testing.T) { }) // Now change the constraint asking for t to be placed in r2. This time it can be satisfied. - _, err = db.Exec("alter table t configure zone using constraints='[+region=r2]'") - require.NoError(t, err) + tdb.Exec(t, "alter table t configure zone using constraints='[+region=r2]'") // Wait for the violation to clear. testutils.SucceedsSoon(t, func() error { @@ -164,9 +165,11 @@ func TestCriticalLocalitiesReportIntegration(t *testing.T) { defer tc.Stopper().Stop(ctx) db := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(db) // Speed up the generation of the reports. - _, err := db.Exec("set cluster setting kv.replication_reports.interval = '1ms'") - require.NoError(t, err) + tdb.Exec(t, "SET CLUSTER SETTING kv.replication_reports.interval = '1ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") // Since we're using ReplicationManual, all the ranges will start with a // single replica on node 1. So, the node's dc and the node's region are @@ -237,10 +240,9 @@ func TestCriticalLocalitiesReportIntegration(t *testing.T) { // only for creating the zone; we don't actually care about the configuration. // Also do a split by hand. With manual replication, we're not getting the // split for the table automatically. - _, err = db.Exec("create table t(x int primary key); " + - "alter table t configure zone using num_replicas=3; " + + tdb.Exec(t, "create table t(x int primary key); "+ + "alter table t configure zone using num_replicas=3; "+ "alter table t split at values (0);") - require.NoError(t, err) // Get the id of the newly created zone. r := db.QueryRow("select zone_id from crdb_internal.zones where table_name = 't'") var zoneID int @@ -250,18 +252,16 @@ func TestCriticalLocalitiesReportIntegration(t *testing.T) { require.NoError(t, checkCritical(db, zoneID, "region=r1", "region=r1,dc=dc1")) // Upreplicate to 2 dcs. Now they're both critical. - _, err = db.Exec("ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2], 1)") - require.NoError(t, err) + tdb.Exec(t, "ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2], 1)") + require.NoError(t, checkCritical(db, zoneID, "region=r1", "region=r1,dc=dc1", "region=r1,dc=dc2")) // Upreplicate to one more dc. Now no dc is critical, only the region. - _, err = db.Exec("ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3], 1)") - require.NoError(t, err) + tdb.Exec(t, "ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3], 1)") require.NoError(t, checkCritical(db, zoneID, "region=r1")) // Move two replicas to the other region. Now that region is critical. - _, err = db.Exec("ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,4,5], 1)") - require.NoError(t, err) + tdb.Exec(t, "ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,4,5], 1)") require.NoError(t, checkCritical(db, zoneID, "region=r2")) } @@ -314,31 +314,30 @@ func TestReplicationStatusReportIntegration(t *testing.T) { defer tc.Stopper().Stop(ctx) db := tc.ServerConn(0) - // Speed up the generation of the - _, err := db.Exec("set cluster setting kv.replication_reports.interval = '1ms'") - require.NoError(t, err) + tdb := sqlutils.MakeSQLRunner(db) + // Speed up the generation of the reports. + tdb.Exec(t, "SET CLUSTER SETTING kv.replication_reports.interval = '1ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") // Create a table with a dummy zone config. Configuring the zone is useful // only for creating the zone; we don't actually care about the configuration. // Also do a split by hand. With manual replication, we're not getting the // split for the table automatically. - _, err = db.Exec("create table t(x int primary key); " + - "alter table t configure zone using num_replicas=3; " + + tdb.Exec(t, "create table t(x int primary key); "+ + "alter table t configure zone using num_replicas=3; "+ "alter table t split at values (0);") - require.NoError(t, err) // Get the id of the newly created zone. r := db.QueryRow("select zone_id from crdb_internal.zones where table_name = 't'") var zoneID int require.NoError(t, r.Scan(&zoneID)) // Upreplicate the range. - _, err = db.Exec("ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3], 1)") - require.NoError(t, err) + tdb.Exec(t, "ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3], 1)") require.NoError(t, checkZoneReplication(db, zoneID, 1, 0, 0, 0)) // Over-replicate. - _, err = db.Exec("ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3,4], 1)") - require.NoError(t, err) + tdb.Exec(t, "ALTER TABLE t EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2,3,4], 1)") require.NoError(t, checkZoneReplication(db, zoneID, 1, 0, 1, 0)) // TODO(andrei): I'd like to downreplicate to one replica and then stop that @@ -424,7 +423,7 @@ func TestRetriableErrorWhenGenerationReport(t *testing.T) { s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - cfg := s.GossipI().(*gossip.Gossip).GetSystemConfig() + cfg := s.ExecutorConfig().(sql.ExecutorConfig).SystemConfig.GetSystemConfig() dummyNodeChecker := func(id roachpb.NodeID) bool { return true } v := makeReplicationStatsVisitor(ctx, cfg, dummyNodeChecker) diff --git a/pkg/kv/kvserver/split_queue_test.go b/pkg/kv/kvserver/split_queue_test.go index 7b2533e238b2..9e75db36e0a6 100644 --- a/pkg/kv/kvserver/split_queue_test.go +++ b/pkg/kv/kvserver/split_queue_test.go @@ -70,7 +70,7 @@ func TestSplitQueueShouldQueue(t *testing.T) { {roachpb.RKey(keys.SystemSQLCodec.TablePrefix(2001)), roachpb.RKeyMax, 32<<20 + 1, 64 << 20, true, 1}, } - cfg := tc.gossip.GetSystemConfig() + cfg := tc.gossip.DeprecatedGetSystemConfig() if cfg == nil { t.Fatal("config not set") } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 63de91819a50..59a328857c8b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -246,6 +246,12 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { ScanInterval: 10 * time.Minute, HistogramWindowInterval: metric.TestSampleInterval, ProtectedTimestampCache: protectedts.EmptyCache(clock), + + // Use a constant empty system config, which mirrors the previously + // existing logic to install an empty system config in gossip. + SystemConfigProvider: config.NewConstantSystemConfigProvider( + config.NewSystemConfig(zonepb.DefaultZoneConfigRef()), + ), } // Use shorter Raft tick settings in order to minimize start up and failover @@ -1053,6 +1059,13 @@ type StoreConfig struct { // KVAdmissionController is an optional field used for admission control. KVAdmissionController KVAdmissionController + + // SystemConfigProvider is used to drive replication decision-making in the + // mixed-version state, before the span configuration infrastructure has been + // bootstrapped. + // + // TODO(ajwerner): Remove in 22.2. + SystemConfigProvider config.SystemConfigProvider } // ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the @@ -1918,24 +1931,25 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) } - // Gossip is only ever nil while bootstrapping a cluster and - // in unittests. - if s.cfg.Gossip != nil { - // Register update channel for any changes to the system config. - // This may trigger splits along structured boundaries, - // and update max range bytes. - gossipUpdateC := s.cfg.Gossip.RegisterSystemConfigChannel() + // SystemConfigProvider can be nil during some tests. + if scp := s.cfg.SystemConfigProvider; scp != nil { + systemCfgUpdateC, _ := scp.RegisterSystemConfigChannel() _ = s.stopper.RunAsyncTask(ctx, "syscfg-listener", func(context.Context) { for { select { - case <-gossipUpdateC: - cfg := s.cfg.Gossip.GetSystemConfig() + case <-systemCfgUpdateC: + cfg := scp.GetSystemConfig() s.systemGossipUpdate(cfg) case <-s.stopper.ShouldQuiesce(): return } } }) + } + + // Gossip is only ever nil while bootstrapping a cluster and + // in unittests. + if s.cfg.Gossip != nil { // Start a single goroutine in charge of periodically gossiping the // sentinel and first range metadata if we have a first range. @@ -1968,9 +1982,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { enabled := spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) if enabled { s.applyAllFromSpanConfigStore(ctx) - } else { - if s.cfg.Gossip != nil && s.cfg.Gossip.GetSystemConfig() != nil { - s.systemGossipUpdate(s.cfg.Gossip.GetSystemConfig()) + } else if scp := s.cfg.SystemConfigProvider; scp != nil { + if sc := scp.GetSystemConfig(); sc != nil { + s.systemGossipUpdate(sc) } } }) @@ -2113,11 +2127,6 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro return nil, errSysCfgUnavailable } - sysCfg := s.cfg.Gossip.GetSystemConfig() - if sysCfg == nil { - return nil, errSysCfgUnavailable - } - // We need a version gate here before switching over to the span configs // infrastructure. In a mixed-version cluster we need to wait for // the host tenant to have fully populated `system.span_configurations` @@ -2145,6 +2154,11 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) || !s.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableSpanConfigStore) || s.TestingKnobs().UseSystemConfigSpanForQueues { + + sysCfg := s.cfg.SystemConfigProvider.GetSystemConfig() + if sysCfg == nil { + return nil, errSysCfgUnavailable + } return sysCfg, nil } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index bc918165c833..d615e274af0f 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -272,7 +272,10 @@ func createTestStoreWithConfig( ) *Store { store := createTestStoreWithoutStart(ctx, t, stopper, opts, cfg) // Put an empty system config into gossip. - if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, + // + // TODO(ajwerner): Remove this in 22.2. It's possible it can be removed + // already. + if err := store.Gossip().AddInfoProto(gossip.KeyDeprecatedSystemConfig, &config.SystemConfigEntries{}, 0); err != nil { t.Fatal(err) } diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 0254aa95b1c3..7b88b0d16359 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -386,16 +386,16 @@ func (txn *Txn) RequiredFrontier() hlc.Timestamp { return txn.mu.sender.RequiredFrontier() } -// SetSystemConfigTrigger sets the system db trigger to true on this transaction. +// DeprecatedSetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTxnRequest. Note that this method takes a boolean // argument indicating whether this transaction is intended for the system // tenant. Only transactions for the system tenant need to set the system config // trigger which is used to gossip updates to the system config to KV servers. // The KV servers need access to an up-to-date system config in order to // determine split points and zone configurations. -func (txn *Txn) SetSystemConfigTrigger(forSystemTenant bool) error { +func (txn *Txn) DeprecatedSetSystemConfigTrigger(forSystemTenant bool) error { if txn.typ != RootTxn { - return errors.AssertionFailedf("SetSystemConfigTrigger() called on leaf txn") + return errors.AssertionFailedf("DeprecatedSetSystemConfigTrigger() called on leaf txn") } if !forSystemTenant { return nil diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index f19c7a92448f..7d681ff914a2 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -538,7 +538,7 @@ func TestUpdateDeadlineMaybe(t *testing.T) { } } -// Test that, if SetSystemConfigTrigger() fails, the systemConfigTrigger has not +// Test that, if DeprecatedSetSystemConfigTrigger() fails, the systemConfigTrigger has not // been set. func TestAnchoringErrorNoTrigger(t *testing.T) { defer leaktest.AfterTest(t)() @@ -555,7 +555,7 @@ func TestAnchoringErrorNoTrigger(t *testing.T) { return nil, nil }), clock, stopper) txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) - require.EqualError(t, txn.SetSystemConfigTrigger(true /* forSystemTenant */), "unimplemented") + require.EqualError(t, txn.DeprecatedSetSystemConfigTrigger(true /* forSystemTenant */), "unimplemented") require.False(t, txn.systemConfigTrigger) } diff --git a/pkg/migration/migrations/descriptor_utils.go b/pkg/migration/migrations/descriptor_utils.go index 8a20aaa0816b..95211cc6c3f7 100644 --- a/pkg/migration/migrations/descriptor_utils.go +++ b/pkg/migration/migrations/descriptor_utils.go @@ -60,9 +60,6 @@ func createSystemTable( b := txn.NewBatch() b.CPut(tKey, desc.GetID(), nil) b.CPut(catalogkeys.MakeDescMetadataKey(codec, desc.GetID()), desc.DescriptorProto(), nil) - if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { - return err - } return txn.Run(ctx, b) }) } diff --git a/pkg/migration/migrations/descriptor_utils_test.go b/pkg/migration/migrations/descriptor_utils_test.go index 38b24f9e5715..8f616138b878 100644 --- a/pkg/migration/migrations/descriptor_utils_test.go +++ b/pkg/migration/migrations/descriptor_utils_test.go @@ -89,16 +89,18 @@ SELECT * table.GetParentID(), table.GetParentSchemaID(), table.GetName()) } require.Len(t, checkEntries(t), 0) - require.NoError(t, - migrations.CreateSystemTable(ctx, tc.Server(0).DB(), keys.SystemSQLCodec, table)) + require.NoError(t, migrations.CreateSystemTable( + ctx, tc.Server(0).DB(), keys.SystemSQLCodec, table, + )) require.Len(t, checkEntries(t), 1) sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE TABLE system.fake_table]", [][]string{{fakeTableSchema}}) // Make sure it's idempotent. - require.NoError(t, - migrations.CreateSystemTable(ctx, tc.Server(0).DB(), keys.SystemSQLCodec, table)) + require.NoError(t, migrations.CreateSystemTable( + ctx, tc.Server(0).DB(), keys.SystemSQLCodec, table, + )) require.Len(t, checkEntries(t), 1) } diff --git a/pkg/server/node.go b/pkg/server/node.go index b4f838cbd676..a7e075f8e117 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1313,34 +1313,7 @@ func (n *Node) GossipSubscription( ctx := n.storeCfg.AmbientCtx.AnnotateCtx(stream.Context()) ctxDone := ctx.Done() - stripContent := func(_ string, content roachpb.Value) (roachpb.Value, error) { - // Don't strip anything. - return content, nil - } - if _, ok := roachpb.TenantFromContext(ctx); ok { - // If this is a tenant connection, strip portions of the gossip infos. - stripContent = func(key string, content roachpb.Value) (roachpb.Value, error) { - switch key { - case gossip.KeySystemConfig: - // Strip the system config down to just those keys in the - // GossipSubscriptionSystemConfigMask, preventing cluster-wide - // or system tenant-specific information to leak. - var ents config.SystemConfigEntries - if err := content.GetProto(&ents); err != nil { - return roachpb.Value{}, errors.Wrap(err, "could not unmarshal system config") - } - - var newContent roachpb.Value - newEnts := kvtenant.GossipSubscriptionSystemConfigMask.Apply(ents) - if err := newContent.SetProto(&newEnts); err != nil { - return roachpb.Value{}, errors.Wrap(err, "could not marshal system config") - } - return newContent, nil - default: - return content, nil - } - } - } + _, isSecondaryTenant := roachpb.TenantFromContext(ctx) // Register a callback for each of the requested patterns. We don't want to // block the gossip callback goroutine on a slow consumer, so we instead @@ -1352,43 +1325,78 @@ func (n *Node) GossipSubscription( entC := make(chan *roachpb.GossipSubscriptionEvent, 256) entCClosed := false var callbackMu syncutil.Mutex - const maxBlockDur = 1 * time.Millisecond - for _, pattern := range args.Patterns { - pattern := pattern - callback := func(key string, content roachpb.Value) { - callbackMu.Lock() - defer callbackMu.Unlock() - if entCClosed { - return - } - content, err := stripContent(key, content) - var event roachpb.GossipSubscriptionEvent - if err != nil { - event.Error = roachpb.NewError(err) - } else { + var systemConfigUpdateCh <-chan struct{} + for i := range args.Patterns { + pattern := args.Patterns[i] // copy for closure + switch pattern { + // Note that we need to support clients subscribing to the system config + // over this RPC even if the system config is no longer stored in gossip + // in the host cluster. To achieve this, we special-case the system config + // key and hook it up to the node's SystemConfigProvider. We need to + // support this because tenant clusters are upgraded *after* the system + // tenant of the host cluster. Tenant sql servers will still be expecting + // this information to drive GC TTLs for their GC jobs. It's worth noting + // that those zone configurations won't really map to reality, but that's + // okay, we just need to tell the pods something. + // + // TODO(ajwerner): Remove support for the system config key in the + // in 22.2, or leave it and make it a no-op. + case gossip.KeyDeprecatedSystemConfig: + var unregister func() + systemConfigUpdateCh, unregister = n.storeCfg.SystemConfigProvider.RegisterSystemConfigChannel() + defer unregister() + default: + callback := func(key string, content roachpb.Value) { + callbackMu.Lock() + defer callbackMu.Unlock() + if entCClosed { + return + } + var event roachpb.GossipSubscriptionEvent event.Key = key event.Content = content event.PatternMatched = pattern - } - select { - case entC <- &event: - default: + const maxBlockDur = 1 * time.Millisecond select { case entC <- &event: - case <-time.After(maxBlockDur): - // entC blocking for too long. The consumer must not be - // keeping up. Terminate the subscription. - close(entC) - entCClosed = true + default: + select { + case entC <- &event: + case <-time.After(maxBlockDur): + // entC blocking for too long. The consumer must not be + // keeping up. Terminate the subscription. + close(entC) + entCClosed = true + } } } + unregister := n.storeCfg.Gossip.RegisterCallback(pattern, callback) + defer unregister() } - unregister := n.storeCfg.Gossip.RegisterCallback(pattern, callback) - defer unregister() } - + handleSystemConfigUpdate := func() error { + cfg := n.storeCfg.SystemConfigProvider.GetSystemConfig() + ents := cfg.SystemConfigEntries + if isSecondaryTenant { + ents = kvtenant.GossipSubscriptionSystemConfigMask.Apply(ents) + } + var event roachpb.GossipSubscriptionEvent + var content roachpb.Value + if err := content.SetProto(&ents); err != nil { + event.Error = roachpb.NewError(errors.Wrap(err, "could not marshal system config")) + } else { + event.Key = gossip.KeyDeprecatedSystemConfig + event.Content = content + event.PatternMatched = gossip.KeyDeprecatedSystemConfig + } + return stream.Send(&event) + } for { select { + case <-systemConfigUpdateCh: + if err := handleSystemConfigUpdate(); err != nil { + return errors.Wrap(err, "handling system config update") + } case e, ok := <-entC: if !ok { // The consumer was not keeping up with gossip updates, so its diff --git a/pkg/server/server.go b/pkg/server/server.go index b3019a71f3d4..e8e641d0ab17 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -496,6 +496,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sqlMonitorAndMetrics.rootSQLMemoryMonitor, stopper, ) + systemConfigWatcher := systemconfigwatcher.New( + keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig, + ) + storeCfg := kvserver.StoreConfig{ DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), Settings: st, @@ -523,6 +527,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ExternalStorageFromURI: externalStorageFromURI, ProtectedTimestampCache: protectedtsProvider, KVMemoryMonitor: kvMemoryMonitor, + SystemConfigProvider: systemConfigWatcher, } var spanConfig struct { @@ -625,9 +630,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { kvserver.RegisterPerStoreServer(grpcServer.Server, node.perReplicaServer) ctpb.RegisterSideTransportServer(grpcServer.Server, ctReceiver) - systemConfigWatcher := systemconfigwatcher.New( - keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig, - ) replicationReporter := reports.NewReporter( db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher, ) diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index f131d4d0e353..4d66f832f077 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -41,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" @@ -393,12 +395,34 @@ func TestAcceptEncoding(t *testing.T) { } } +// TestSystemConfigGossip tests that system config gossip works in the mixed +// version state. After the 22.1 release is finalized, system config gossip +// will no longer occur. +// +// TODO(ajwerner): Delete this test in 22.2. func TestSystemConfigGossip(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryMinSupportedVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, + ) + serverArgs := base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + Server: &TestingKnobs{ + BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + } + s, _, kvDB := serverutils.StartServer(t, serverArgs) defer s.Stopper().Stop(ctx) ts := s.(*TestServer) @@ -410,7 +434,7 @@ func TestSystemConfigGossip(t *testing.T) { } // Register a callback for gossip updates. - resultChan := ts.Gossip().RegisterSystemConfigChannel() + resultChan := ts.Gossip().DeprecatedRegisterSystemConfigChannel() // The span gets gossiped when it first shows up. select { @@ -422,7 +446,7 @@ func TestSystemConfigGossip(t *testing.T) { // Write a system key with the transaction marked as having a Gossip trigger. if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { + if err := txn.DeprecatedSetSystemConfigTrigger(true /* forSystemTenant */); err != nil { return err } return txn.Put(ctx, key, valAt(2)) @@ -438,7 +462,7 @@ func TestSystemConfigGossip(t *testing.T) { var systemConfig *config.SystemConfig select { case <-resultChan: - systemConfig = ts.gossip.GetSystemConfig() + systemConfig = ts.gossip.DeprecatedGetSystemConfig() case <-time.After(500 * time.Millisecond): return errors.Errorf("did not receive gossip message") diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 456bc5eb72fc..7e3dbf4ce5e1 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -235,6 +235,7 @@ func TestHealthTelemetry(t *testing.T) { func TestStatusGossipJson(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) @@ -251,9 +252,6 @@ func TestStatusGossipJson(t *testing.T) { if _, ok := data.Infos["node:1"]; !ok { t.Errorf("no node 1 info returned: %v", data) } - if _, ok := data.Infos["system-db"]; !ok { - t.Errorf("no system config info returned: %v", data) - } } // TestStatusEngineStatsJson ensures that the output response for the engine diff --git a/pkg/server/systemconfigwatcher/cache.go b/pkg/server/systemconfigwatcher/cache.go index 9b11ac7813b6..678ff9b351c9 100644 --- a/pkg/server/systemconfigwatcher/cache.go +++ b/pkg/server/systemconfigwatcher/cache.go @@ -38,7 +38,7 @@ type Cache struct { cfg *config.SystemConfig timestamp hlc.Timestamp - registered []chan<- struct{} + registered map[chan<- struct{}]struct{} } } @@ -53,6 +53,7 @@ func New( c := Cache{ defaultZoneConfig: defaultZoneConfig, } + c.mu.registered = make(map[chan<- struct{}]struct{}) // TODO(ajwerner): Consider stripping this down to just watching // descriptor and zones. @@ -85,12 +86,26 @@ func (c *Cache) GetSystemConfig() *config.SystemConfig { // RegisterSystemConfigChannel is part of the config.SystemConfigProvider // interface. -func (c *Cache) RegisterSystemConfigChannel() <-chan struct{} { +func (c *Cache) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) { ch := make(chan struct{}, 1) c.mu.Lock() defer c.mu.Unlock() - c.mu.registered = append(c.mu.registered, ch) - return ch + + c.mu.registered[ch] = struct{}{} + return ch, func() { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.mu.registered, ch) + } +} + +// LastUpdated returns the timestamp corresponding to the current state of +// the cache. Any subsequent call to GetSystemConfig will see a state that +// corresponds to a snapshot as least as new as this timestamp. +func (c *Cache) LastUpdated() hlc.Timestamp { + c.mu.RLock() + defer c.mu.RUnlock() + return c.mu.timestamp } type keyValues []roachpb.KeyValue @@ -126,23 +141,24 @@ func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { updatedCfg := config.NewSystemConfig(c.defaultZoneConfig) updatedCfg.Values = updatedData - toNotify := c.setUpdatedConfig(updatedCfg, update.Timestamp) - for _, c := range toNotify { - select { - case c <- struct{}{}: - default: - } - } + c.setUpdatedConfig(updatedCfg, update.Timestamp) } -func (c *Cache) setUpdatedConfig( - updated *config.SystemConfig, ts hlc.Timestamp, -) (toNotify []chan<- struct{}) { +func (c *Cache) setUpdatedConfig(updated *config.SystemConfig, ts hlc.Timestamp) { c.mu.Lock() defer c.mu.Unlock() + changed := c.mu.cfg != updated c.mu.cfg = updated c.mu.timestamp = ts - return c.mu.registered + if !changed { + return + } + for ch := range c.mu.registered { + select { + case ch <- struct{}{}: + default: + } + } } func passThroughTranslation( diff --git a/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go index 0935811c5c5f..93c721664aa3 100644 --- a/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go +++ b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go @@ -67,7 +67,7 @@ func runTest(t *testing.T, s serverutils.TestTenantInterface, sqlDB *gosql.DB) { execCfg := s.ExecutorConfig().(sql.ExecutorConfig) kvDB := execCfg.DB r := execCfg.SystemConfig - rc := r.RegisterSystemConfigChannel() + rc, _ := r.RegisterSystemConfigChannel() clearChan := func() { select { case <-rc: diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 6ad863ea2733..b33af05a2eaf 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -626,6 +626,11 @@ func (t *TestTenant) SpanConfigSQLWatcher() interface{} { return t.SQLServer.spanconfigSQLWatcher } +// SystemConfigProvider is part TestTenantInterface. +func (t *TestTenant) SystemConfigProvider() config.SystemConfigProvider { + return t.SQLServer.systemConfigWatcher +} + // DrainClients exports the drainClients() method for use by tests. func (t *TestTenant) DrainClients(ctx context.Context) error { return t.drain.drainClients(ctx, nil /* reporter */) @@ -1338,6 +1343,11 @@ func (ts *TestServer) SpanConfigKVSubscriber() interface{} { return ts.node.storeCfg.SpanConfigSubscriber } +// SystemConfigProvider is part of the TestServerInterface. +func (ts *TestServer) SystemConfigProvider() config.SystemConfigProvider { + return ts.node.storeCfg.SystemConfigProvider +} + type testServerFactoryImpl struct{} // TestServerFactory can be passed to serverutils.InitTestServerFactory diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index e04049c19d16..cffcaf87e2d9 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -88,8 +89,13 @@ func (cf *CollectionFactory) Txn( deletedDescs = catalog.DescriptorIDSet{} descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */) defer descsCol.ReleaseAll(ctx) - if !UnsafeSkipSystemConfigTrigger.Get(&cf.settings.SV) { - if err := txn.SetSystemConfigTrigger(cf.leaseMgr.Codec().ForSystemTenant()); err != nil { + if !UnsafeSkipSystemConfigTrigger.Get(&cf.settings.SV) && + !cf.settings.Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { + if err := txn.DeprecatedSetSystemConfigTrigger( + cf.leaseMgr.Codec().ForSystemTenant(), + ); err != nil { return err } } diff --git a/pkg/sql/catalog/lease/helpers_test.go b/pkg/sql/catalog/lease/helpers_test.go index 69010c84b0b0..8fefe7bd7f76 100644 --- a/pkg/sql/catalog/lease/helpers_test.go +++ b/pkg/sql/catalog/lease/helpers_test.go @@ -194,11 +194,6 @@ func (m *Manager) PublishMultiple( versions[id] = descsToUpdate[id].GetVersion() } - // This is to write the updated descriptors if we're the system tenant. - if err := txn.SetSystemConfigTrigger(m.storage.codec.ForSystemTenant()); err != nil { - return err - } - // Run the update closure. if err := update(txn, descsToUpdate); err != nil { return err @@ -223,7 +218,7 @@ func (m *Manager) PublishMultiple( // descriptor change occurs first in the transaction. This is // necessary to ensure that the System configuration change is // gossiped. See the documentation for - // transaction.SetSystemConfigTrigger() for more information. + // transaction.DeprecatedSetSystemConfigTrigger() for more information. if err := txn.Run(ctx, b); err != nil { return err } diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index b4368b6b03b2..ad9ff11ce414 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -280,11 +281,13 @@ func startConnExecutor( defer tempEngine.Close() ambientCtx := log.MakeTestingAmbientCtxWithNewTracer() cfg := &ExecutorConfig{ - AmbientCtx: ambientCtx, - Settings: st, - Clock: clock, - DB: db, - SystemConfig: config.EmptySystemConfigProvider{}, + AmbientCtx: ambientCtx, + Settings: st, + Clock: clock, + DB: db, + SystemConfig: config.NewConstantSystemConfigProvider( + config.NewSystemConfig(zonepb.DefaultZoneConfigRef()), + ), SessionRegistry: NewSessionRegistry(), NodeInfo: NodeInfo{ NodeID: nodeID, diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index b65df8037aa0..04cc2afbf723 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -234,9 +234,6 @@ CREATE TABLE t.test (k INT); // Write the modified descriptor. if err := kvDB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } return txn.Put(ctx, catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID), tableDesc.DescriptorProto()) }); err != nil { t.Fatal(err) diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index c6bec989cc0e..c1a5e91de1ef 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -804,9 +804,6 @@ func TestDropTableDeleteData(t *testing.T) { func writeTableDesc(ctx context.Context, db *kv.DB, tableDesc *tabledesc.Mutable) error { return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } tableDesc.ModificationTime = txn.CommitTimestamp() return txn.Put(ctx, catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID), tableDesc.DescriptorProto()) }) diff --git a/pkg/sql/gcjob/BUILD.bazel b/pkg/sql/gcjob/BUILD.bazel index 6ce1a99ea0bc..9e19d5cb2921 100644 --- a/pkg/sql/gcjob/BUILD.bazel +++ b/pkg/sql/gcjob/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/gcjob", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/jobs", diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index b5bb960810ad..bc717fa19c0f 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -13,6 +13,7 @@ package gcjob import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -74,8 +75,11 @@ func deleteDatabaseZoneConfig( return nil } return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) { - if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { + if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) && + !settings.Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { + if err := txn.DeprecatedSetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { return err } } diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier.go b/pkg/sql/gcjob/gcjobnotifier/notifier.go index b62ef235abf4..a4326cb73111 100644 --- a/pkg/sql/gcjob/gcjobnotifier/notifier.go +++ b/pkg/sql/gcjob/gcjobnotifier/notifier.go @@ -131,12 +131,12 @@ func (n *Notifier) Start(ctx context.Context) { func (n *Notifier) run(_ context.Context) { defer n.markStopped() - gossipUpdateCh := n.provider.RegisterSystemConfigChannel() + systemConfigUpdateCh, _ := n.provider.RegisterSystemConfigChannel() for { select { case <-n.stopper.ShouldQuiesce(): return - case <-gossipUpdateCh: + case <-systemConfigUpdateCh: n.maybeNotify() } } diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier_test.go b/pkg/sql/gcjob/gcjobnotifier/notifier_test.go index 165457262de7..d5372dc6fbb0 100644 --- a/pkg/sql/gcjob/gcjobnotifier/notifier_test.go +++ b/pkg/sql/gcjob/gcjobnotifier/notifier_test.go @@ -46,8 +46,8 @@ func (t *testingProvider) setSystemConfig(cfg *config.SystemConfig) { t.cfg = cfg } -func (t *testingProvider) RegisterSystemConfigChannel() <-chan struct{} { - return t.ch +func (t *testingProvider) RegisterSystemConfigChannel() (<-chan struct{}, func()) { + return t.ch, func() {} } var _ config.SystemConfigProvider = (*testingProvider)(nil) diff --git a/pkg/sql/opt/exec/execbuilder/BUILD.bazel b/pkg/sql/opt/exec/execbuilder/BUILD.bazel index 505668ef41aa..bb7c3dcf0233 100644 --- a/pkg/sql/opt/exec/execbuilder/BUILD.bazel +++ b/pkg/sql/opt/exec/execbuilder/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/execbuilder", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/server/telemetry", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 175e5b067698..32da9f4fc16b 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -16,6 +16,7 @@ import ( "fmt" "math" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -159,8 +160,11 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` // where the table already exists. This will generate some false schema // cache refreshes, but that's expected to be quite rare in practice. - if !descs.UnsafeSkipSystemConfigTrigger.Get(&b.evalCtx.Settings.SV) { - if err := b.evalCtx.Txn.SetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil { + if !descs.UnsafeSkipSystemConfigTrigger.Get(&b.evalCtx.Settings.SV) && + !b.evalCtx.Settings.Version.IsActive( + b.evalCtx.Ctx(), clusterversion.DisableSystemConfigGossipTrigger, + ) { + if err := b.evalCtx.Txn.DeprecatedSetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil { return execPlan{}, errors.WithSecondaryError( unimplemented.NewWithIssuef(26508, "the first schema change statement in a transaction must precede any writes"), diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index fca67251ff85..c2f73191d4b3 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -232,6 +232,12 @@ ROLLBACK subtest system_table_lookup +# Read from the table once to remove any intents. This test can be flakey due +# to the test encountering an intent on system.public.eventlog from previous +# transactions. +statement ok +SELECT * FROM system.public.eventlog; + # We use AOST to bypass the table cache. statement ok SET tracing = on,kv; SELECT * FROM system.eventlog AS OF SYSTEM TIME '-1us'; SET tracing = off diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 495940ce3e6a..4ea4b61c90a2 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -13,6 +13,7 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -542,12 +543,15 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN // is within the SystemConfig range. func (p *planner) maybeSetSystemConfig(id descpb.ID) error { if !descpb.IsSystemConfigID(id) || - descs.UnsafeSkipSystemConfigTrigger.Get(&p.EvalContext().Settings.SV) { + descs.UnsafeSkipSystemConfigTrigger.Get(&p.EvalContext().Settings.SV) || + p.execCfg.Settings.Version.IsActive( + p.EvalContext().Ctx(), clusterversion.DisableSystemConfigGossipTrigger, + ) { return nil } // Mark transaction as operating on the system DB. // Only the system tenant marks the SystemConfigTrigger. - return p.txn.SetSystemConfigTrigger(p.execCfg.Codec.ForSystemTenant()) + return p.txn.DeprecatedSetSystemConfigTrigger(p.execCfg.Codec.ForSystemTenant()) } // planFlags is used throughout the planning code to keep track of various diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index e48085ff6120..1a007e2b657a 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -2644,8 +2645,10 @@ func DeleteTableDescAndZoneConfig( ) error { log.Infof(ctx, "removing table descriptor and zone config for table %d", tableDesc.GetID()) return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) { - if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { + if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) && !settings.Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { + if err := txn.DeprecatedSetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { return err } } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 7f9590ca66ab..3b6ea4e9d72a 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -3754,10 +3754,9 @@ INSERT INTO t.kv VALUES ('a', 'b'); }, // schema change at the end of a transaction that has written. { - name: `insert-create`, - firstStmt: `INSERT INTO t.kv VALUES ('e', 'f')`, - secondStmt: `CREATE INDEX foo2 ON t.kv (v)`, - expectedErr: `the first schema change statement in a transaction must precede any writes`, + name: `insert-create`, + firstStmt: `INSERT INTO t.kv VALUES ('e', 'f')`, + secondStmt: `CREATE INDEX foo2 ON t.kv (v)`, }, // schema change at the end of a read only transaction. { diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index c2b25b5f62b9..f92bb47aa456 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -176,10 +176,14 @@ func (n *setClusterSettingNode) startExec(params runParams) error { // sql.catalog.unsafe_skip_system_config_trigger.enabled cluster setting. // The usage of gossip to propagate cluster settings in the system tenant // will be fixed in an upcoming PR with #70566. - if err := params.p.EvalContext().Txn.SetSystemConfigTrigger( - params.EvalContext().Codec.ForSystemTenant(), - ); err != nil { - return err + if !params.EvalContext().Settings.Version.IsActive( + params.ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { + if err := params.p.EvalContext().Txn.DeprecatedSetSystemConfigTrigger( + params.EvalContext().Codec.ForSystemTenant(), + ); err != nil { + return err + } } execCfg := params.extendedEvalCtx.ExecCfg diff --git a/pkg/sql/tests/BUILD.bazel b/pkg/sql/tests/BUILD.bazel index 0d81b0cb2c28..635c76393aaa 100644 --- a/pkg/sql/tests/BUILD.bazel +++ b/pkg/sql/tests/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", diff --git a/pkg/sql/tests/end_txn_trigger.go b/pkg/sql/tests/end_txn_trigger.go index d545a25938e2..f2bfb66f6859 100644 --- a/pkg/sql/tests/end_txn_trigger.go +++ b/pkg/sql/tests/end_txn_trigger.go @@ -13,6 +13,7 @@ package tests import ( "bytes" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -21,6 +22,9 @@ import ( // CheckEndTxnTrigger verifies that an EndTxnRequest that includes intents for // the SystemDB keys sets the proper trigger. +// +// TODO(ajwerner): Remove this in 22.2. It only applies to the mixed-version +// state. func CheckEndTxnTrigger(args kvserverbase.FilterArgs) *roachpb.Error { req, ok := args.Req.(*roachpb.EndTxnRequest) if !ok { @@ -52,7 +56,10 @@ func CheckEndTxnTrigger(args kvserverbase.FilterArgs) *roachpb.Error { // on the current state. // For more information, see the related comment at the beginning of // planner.makePlan(). - if hasSystemKey && !modifiedSystemConfigSpan { + if hasSystemKey && + !(clusterversion.ClusterVersion{Version: args.Version}). + IsActive(clusterversion.DisableSystemConfigGossipTrigger) && + !modifiedSystemConfigSpan { return roachpb.NewError(errors.Errorf("EndTxn hasSystemKey=%t, but hasSystemConfigTrigger=%t", hasSystemKey, modifiedSystemConfigSpan)) } diff --git a/pkg/sql/zone_config_test.go b/pkg/sql/zone_config_test.go index f97a5eab334d..8129a3a4519e 100644 --- a/pkg/sql/zone_config_test.go +++ b/pkg/sql/zone_config_test.go @@ -58,9 +58,6 @@ func forceNewConfig(t testing.TB, s *server.TestServer) *config.SystemConfig { // This needs to be done in a transaction with the system trigger set. if err := s.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { - return err - } return txn.Put(ctx, configDescKey, configDesc) }); err != nil { t.Fatal(err) @@ -72,7 +69,7 @@ func waitForConfigChange(t testing.TB, s *server.TestServer) *config.SystemConfi var foundDesc descpb.Descriptor var cfg *config.SystemConfig testutils.SucceedsSoon(t, func() error { - if cfg = s.Gossip().GetSystemConfig(); cfg != nil { + if cfg = s.SystemConfigProvider().GetSystemConfig(); cfg != nil { if val := cfg.GetValue(configDescKey); val != nil { if err := val.GetProto(&foundDesc); err != nil { t.Fatal(err) @@ -106,6 +103,10 @@ func TestGetZoneConfig(t *testing.T) { srv, sqlDB, _ := serverutils.StartServer(t, params) defer srv.Stopper().Stop(context.Background()) + // Set the closed_timestamp interval to be short to shorten the test duration. + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) s := srv.(*server.TestServer) type testCase struct { @@ -338,6 +339,10 @@ func TestCascadingZoneConfig(t *testing.T) { srv, sqlDB, _ := serverutils.StartServer(t, params) defer srv.Stopper().Stop(context.Background()) + // Set the closed_timestamp interval to be short to shorten the test duration. + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) s := srv.(*server.TestServer) type testCase struct { @@ -643,8 +648,12 @@ func BenchmarkGetZoneConfig(b *testing.B) { defer log.Scope(b).Close(b) params, _ := tests.CreateTestServerParams() - srv, _, _ := serverutils.StartServer(b, params) + srv, sqlDB, _ := serverutils.StartServer(b, params) defer srv.Stopper().Stop(context.Background()) + // Set the closed_timestamp interval to be short to shorten the test duration. + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(b, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(b, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) s := srv.(*server.TestServer) cfg := forceNewConfig(b, s) diff --git a/pkg/startupmigrations/BUILD.bazel b/pkg/startupmigrations/BUILD.bazel index 8f0304dfad28..3c3c185be170 100644 --- a/pkg/startupmigrations/BUILD.bazel +++ b/pkg/startupmigrations/BUILD.bazel @@ -45,7 +45,6 @@ go_test( deps = [ "//pkg/base", "//pkg/cli/exit", - "//pkg/gossip", "//pkg/keys", "//pkg/kv", "//pkg/roachpb", diff --git a/pkg/startupmigrations/migrations_test.go b/pkg/startupmigrations/migrations_test.go index aeb6a0ce9409..e559227cbf6e 100644 --- a/pkg/startupmigrations/migrations_test.go +++ b/pkg/startupmigrations/migrations_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/exit" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -624,7 +623,7 @@ func TestExpectedInitialRangeCount(t *testing.T) { return errors.New("last migration has not completed") } - sysCfg := s.GossipI().(*gossip.Gossip).GetSystemConfig() + sysCfg := s.SystemConfigProvider().GetSystemConfig() if sysCfg == nil { return errors.New("gossipped system config not available") } diff --git a/pkg/testutils/localtestcluster/BUILD.bazel b/pkg/testutils/localtestcluster/BUILD.bazel index d725b8657a03..1d9fd8487f76 100644 --- a/pkg/testutils/localtestcluster/BUILD.bazel +++ b/pkg/testutils/localtestcluster/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/kv/kvserver/liveness", "//pkg/roachpb", "//pkg/rpc", + "//pkg/server/systemconfigwatcher", "//pkg/settings/cluster", "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/sql/catalog/bootstrap", diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index ddf1dec9f341..d03277c3c19e 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" @@ -224,6 +225,12 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto cfg.DefaultSpanConfig, nil, ) + cfg.SystemConfigProvider = systemconfigwatcher.New( + keys.SystemSQLCodec, + cfg.Clock, + rangeFeedFactory, + zonepb.DefaultZoneConfigRef(), + ) ltc.Store = kvserver.NewStore(ctx, cfg, ltc.Eng, nodeDesc) diff --git a/pkg/testutils/serverutils/BUILD.bazel b/pkg/testutils/serverutils/BUILD.bazel index 51d0bb6e8d66..a4c79352e774 100644 --- a/pkg/testutils/serverutils/BUILD.bazel +++ b/pkg/testutils/serverutils/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/config", "//pkg/kv", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", diff --git a/pkg/testutils/serverutils/test_tenant_shim.go b/pkg/testutils/serverutils/test_tenant_shim.go index 4dca8a4518dd..379987b325f7 100644 --- a/pkg/testutils/serverutils/test_tenant_shim.go +++ b/pkg/testutils/serverutils/test_tenant_shim.go @@ -18,6 +18,7 @@ import ( "net/http" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -126,6 +127,9 @@ type TestTenantInterface interface { // DrainClients shuts down client connections. DrainClients(ctx context.Context) error + // SystemConfigProvider provides access to the system config. + SystemConfigProvider() config.SystemConfigProvider + // TODO(irfansharif): We'd benefit from an API to construct a *gosql.DB, or // better yet, a *sqlutils.SQLRunner. We use it all the time, constructing // it by hand each time. From 0e45b5fd5c59073d9e53a9b6d1dbce980db32da9 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 9 Feb 2022 00:26:58 -0500 Subject: [PATCH 2/2] sql: remove sql.catalog.unsafe_skip_system_config_trigger.enabled This cluster setting is no longer useful. Release note: None --- pkg/sql/catalog/descs/txn.go | 23 +++------------------- pkg/sql/gcjob/descriptor_utils.go | 7 +++---- pkg/sql/opt/exec/execbuilder/BUILD.bazel | 1 - pkg/sql/opt/exec/execbuilder/relational.go | 8 +++----- pkg/sql/plan.go | 9 +++------ pkg/sql/schema_changer.go | 2 +- pkg/sql/set_cluster_setting.go | 15 -------------- 7 files changed, 13 insertions(+), 52 deletions(-) diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index cffcaf87e2d9..776770e4e215 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -28,21 +27,6 @@ import ( var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violated") -// UnsafeSkipSystemConfigTrigger will prevent setting the system config -// trigger for transactions which write to tables in the system config. The -// implication of setting this to true is that various subsystems which -// rely on that trigger, such as zone configs and replication reports, will -// not work. This can be used to accelerate high-frequency schema changes -// like during an ORM test suite. -var UnsafeSkipSystemConfigTrigger = settings.RegisterBoolSetting( - settings.SystemOnly, - "sql.catalog.unsafe_skip_system_config_trigger.enabled", - "avoid setting the system config trigger in transactions which write to "+ - "the system config. This will unlock performance at the cost of breaking "+ - "table splits, zone configuration propagation, and cluster settings", - false, -) - // Txn enables callers to run transactions with a *Collection such that all // retrieved immutable descriptors are properly leased and all mutable // descriptors are handled. The function deals with verifying the two version @@ -89,10 +73,9 @@ func (cf *CollectionFactory) Txn( deletedDescs = catalog.DescriptorIDSet{} descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */) defer descsCol.ReleaseAll(ctx) - if !UnsafeSkipSystemConfigTrigger.Get(&cf.settings.SV) && - !cf.settings.Version.IsActive( - ctx, clusterversion.DisableSystemConfigGossipTrigger, - ) { + if !cf.settings.Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { if err := txn.DeprecatedSetSystemConfigTrigger( cf.leaseMgr.Codec().ForSystemTenant(), ); err != nil { diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index bc717fa19c0f..63dc4171f798 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -75,10 +75,9 @@ func deleteDatabaseZoneConfig( return nil } return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) && - !settings.Version.IsActive( - ctx, clusterversion.DisableSystemConfigGossipTrigger, - ) { + if !settings.Version.IsActive( + ctx, clusterversion.DisableSystemConfigGossipTrigger, + ) { if err := txn.DeprecatedSetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { return err } diff --git a/pkg/sql/opt/exec/execbuilder/BUILD.bazel b/pkg/sql/opt/exec/execbuilder/BUILD.bazel index bb7c3dcf0233..1598212e5e37 100644 --- a/pkg/sql/opt/exec/execbuilder/BUILD.bazel +++ b/pkg/sql/opt/exec/execbuilder/BUILD.bazel @@ -18,7 +18,6 @@ go_library( "//pkg/server/telemetry", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", - "//pkg/sql/catalog/descs", "//pkg/sql/lexbase", "//pkg/sql/mutations", "//pkg/sql/opt", diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 32da9f4fc16b..ebc82db8a255 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -160,10 +159,9 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { // `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;` // where the table already exists. This will generate some false schema // cache refreshes, but that's expected to be quite rare in practice. - if !descs.UnsafeSkipSystemConfigTrigger.Get(&b.evalCtx.Settings.SV) && - !b.evalCtx.Settings.Version.IsActive( - b.evalCtx.Ctx(), clusterversion.DisableSystemConfigGossipTrigger, - ) { + if !b.evalCtx.Settings.Version.IsActive( + b.evalCtx.Ctx(), clusterversion.DisableSystemConfigGossipTrigger, + ) { if err := b.evalCtx.Txn.DeprecatedSetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil { return execPlan{}, errors.WithSecondaryError( unimplemented.NewWithIssuef(26508, diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 4ea4b61c90a2..b3ac29313fc7 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" @@ -542,11 +541,9 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN // Mark transaction as operating on the system DB if the descriptor id // is within the SystemConfig range. func (p *planner) maybeSetSystemConfig(id descpb.ID) error { - if !descpb.IsSystemConfigID(id) || - descs.UnsafeSkipSystemConfigTrigger.Get(&p.EvalContext().Settings.SV) || - p.execCfg.Settings.Version.IsActive( - p.EvalContext().Ctx(), clusterversion.DisableSystemConfigGossipTrigger, - ) { + if !descpb.IsSystemConfigID(id) || p.execCfg.Settings.Version.IsActive( + p.EvalContext().Ctx(), clusterversion.DisableSystemConfigGossipTrigger, + ) { return nil } // Mark transaction as operating on the system DB. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 1a007e2b657a..69c9037dff82 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2645,7 +2645,7 @@ func DeleteTableDescAndZoneConfig( ) error { log.Infof(ctx, "removing table descriptor and zone config for table %d", tableDesc.GetID()) return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if !descs.UnsafeSkipSystemConfigTrigger.Get(&settings.SV) && !settings.Version.IsActive( + if !settings.Version.IsActive( ctx, clusterversion.DisableSystemConfigGossipTrigger, ) { if err := txn.DeprecatedSetSystemConfigTrigger(codec.ForSystemTenant()); err != nil { diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index f92bb47aa456..09459f84ddda 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -171,21 +171,6 @@ func (n *setClusterSettingNode) startExec(params runParams) error { return errors.Errorf("SET CLUSTER SETTING cannot be used inside a transaction") } - // Set the system config trigger explicitly here as it might not happen - // implicitly due to the setting of the - // sql.catalog.unsafe_skip_system_config_trigger.enabled cluster setting. - // The usage of gossip to propagate cluster settings in the system tenant - // will be fixed in an upcoming PR with #70566. - if !params.EvalContext().Settings.Version.IsActive( - params.ctx, clusterversion.DisableSystemConfigGossipTrigger, - ) { - if err := params.p.EvalContext().Txn.DeprecatedSetSystemConfigTrigger( - params.EvalContext().Codec.ForSystemTenant(), - ); err != nil { - return err - } - } - execCfg := params.extendedEvalCtx.ExecCfg var expectedEncodedValue string if err := execCfg.DB.Txn(params.ctx, func(ctx context.Context, txn *kv.Txn) error {