From 2f6b53ababf190b6c81345ff61c8924ab07a7a3b Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 14 Feb 2022 08:25:42 -0500 Subject: [PATCH] *: fix system config for tenant Follow-on from #76279. Fixes #75864. This commit adds a mechanism to combine the system config data of the tenant with the data provided over the GossipSubscription from the system tenant. It then plumbs a version into the zone config methods. In the mixed version state, the tenant uses the existing override from the system tenant. After the span config infrastructure has been activated, the tenant uses the overrides they've set. This affects, realistically, just the GC job, and, to a lesser extent, the optimizer. Release note: None --- pkg/ccl/backupccl/backup_test.go | 2 +- pkg/ccl/kvccl/kvtenantccl/BUILD.bazel | 2 + .../kvccl/kvtenantccl/tenant_upgrade_test.go | 115 ++++++++++++- .../spanconfiglimiterccl/BUILD.bazel | 1 - .../spanconfiglimiterccl/drop_table_test.go | 6 +- pkg/config/BUILD.bazel | 1 + pkg/config/keys.go | 8 +- pkg/config/keys_test.go | 4 +- pkg/config/system.go | 157 +++++++++-------- pkg/config/system_test.go | 22 +-- pkg/config/testutil.go | 9 +- pkg/kv/kvclient/kvtenant/connector.go | 6 + pkg/kv/kvserver/client_split_test.go | 2 +- pkg/kv/kvserver/merge_queue_test.go | 4 +- pkg/kv/kvserver/queue_test.go | 2 +- .../reports/constraint_stats_report.go | 4 +- .../reports/constraint_stats_report_test.go | 4 +- .../reports/critical_localities_report.go | 2 +- .../reports/replication_stats_report.go | 4 +- pkg/kv/kvserver/reports/reporter.go | 22 +-- pkg/kv/kvserver/reports/reporter_test.go | 9 +- pkg/kv/kvserver/reports/zone_key.go | 4 +- .../public_schema_migration_external_test.go | 2 + pkg/server/systemconfigwatcher/BUILD.bazel | 12 +- pkg/server/systemconfigwatcher/cache.go | 161 +++++++++++++++--- pkg/server/systemconfigwatcher/cache_test.go | 114 ++++++++++++- .../systemconfigwatchertest/BUILD.bazel | 3 + .../test_system_config_watcher.go | 56 ++++-- pkg/server/tenant.go | 3 +- pkg/sql/gcjob/refresh_statuses.go | 8 +- pkg/sql/importer/exportcsv_test.go | 2 +- pkg/sql/opt_catalog.go | 11 +- pkg/sql/zone_config.go | 6 +- pkg/sql/zone_config_test.go | 6 +- 34 files changed, 594 insertions(+), 180 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 63ad0d9badb2..a915121dae31 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -2018,7 +2018,7 @@ func TestBackupRestoreControlJob(t *testing.T) { if err != nil { t.Fatal(err) } - last := config.SystemTenantObjectID(v.ValueInt()) + last := config.ObjectID(v.ValueInt()) zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.RangeMaxBytes = proto.Int64(5000) config.TestingSetZoneConfig(last+1, zoneConfig) diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index a9c216498870..5519c494bc17 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -56,6 +56,7 @@ go_test( "//pkg/config", "//pkg/gossip", "//pkg/jobs", + "//pkg/keys", "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -85,6 +86,7 @@ go_test( "//pkg/util/stop", "//pkg/util/tracing", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go index 4022f6ae39e4..e7a0b447f9f7 100644 --- a/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go @@ -16,23 +16,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/migration/migrations" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) // TestTenantUpgrade exercises the case where a system tenant is in a // non-finalized version state and creates a tenant. The test ensures -// that that newly created tenant begins in that same version. +// that the newly created tenant begins in that same version. // // The first subtest creates the tenant in the mixed version state, // then upgrades the system tenant, then upgrades the secondary tenant, @@ -358,3 +363,111 @@ func TestTenantUpgradeFailure(t *testing.T) { tenantInfo.v2onMigrationStopper.Stop(ctx) }) } + +// TestTenantSystemConfigUpgrade ensures that the tenant GC job uses the +// appropriate view of the GC TTL. +func TestTenantSystemConfigUpgrade(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, // initializeVersion + ) + // Initialize the version to the BinaryMinSupportedVersion. + require.NoError(t, clusterversion.Initialize(ctx, + clusterversion.TestingBinaryMinSupportedVersion, &settings.SV)) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, + }, + }, + }, + }) + hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) + defer tc.Stopper().Stop(ctx) + connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) { + pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(security.RootUser)) + tenantDB, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + return tenantDB, func() { + tenantDB.Close() + cleanupPGUrl() + } + } + mkTenant := func(t *testing.T, id uint64) ( + tenant serverutils.TestTenantInterface, + ) { + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + false, // initializeVersion + ) + // Initialize the version to the minimum it could be. + require.NoError(t, clusterversion.Initialize(ctx, + clusterversion.TestingBinaryMinSupportedVersion, &settings.SV)) + tenantArgs := base.TestTenantArgs{ + TenantID: roachpb.MakeTenantID(id), + TestingKnobs: base.TestingKnobs{}, + Settings: settings, + } + tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs) + require.NoError(t, err) + return tenant + } + const tenantID = 10 + codec := keys.MakeSQLCodec(roachpb.MakeTenantID(tenantID)) + tenant := mkTenant(t, tenantID) + tenantSQL, cleanup := connectToTenant(t, tenant.SQLAddr()) + defer cleanup() + tenantDB := sqlutils.MakeSQLRunner(tenantSQL) + tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{"21.2"}}) + tenantDB.Exec(t, "CREATE TABLE foo ()") + fooID := sqlutils.QueryTableID(t, tenantSQL, "defaultdb", "public", "foo") + tenantP := tenant.SystemConfigProvider() + ch, _ := tenantP.RegisterSystemConfigChannel() + + hostDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") + hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 111") + hostDB.Exec(t, + "ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true;", + tenantID) + tenantDB.CheckQueryResultsRetry( + t, "SHOW CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled", + [][]string{{"true"}}, + ) + tenantVersion := func() clusterversion.ClusterVersion { + return tenant.ClusterSettings().Version.ActiveVersionOrEmpty(ctx) + } + checkConfigEqual := func(t *testing.T, exp int32) { + testutils.SucceedsSoon(t, func() error { + cfg := tenantP.GetSystemConfig() + if cfg == nil { + return errors.New("no config") + } + conf, err := tenantP.GetSystemConfig().GetZoneConfigForObject(codec, tenantVersion(), config.ObjectID(fooID)) + if err != nil { + return err + } + if conf.GC.TTLSeconds != exp { + return errors.Errorf("got %d, expected %d", conf.GC.TTLSeconds, exp) + } + return nil + }) + } + checkConfigEqual(t, 111) + <-ch + hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 112") + <-ch + checkConfigEqual(t, 112) + tenantDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") + tenantDB.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 113") + <-ch + checkConfigEqual(t, 113) +} diff --git a/pkg/ccl/spanconfigccl/spanconfiglimiterccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfiglimiterccl/BUILD.bazel index 128318644767..00a311339b93 100644 --- a/pkg/ccl/spanconfigccl/spanconfiglimiterccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfiglimiterccl/BUILD.bazel @@ -15,7 +15,6 @@ go_test( "//pkg/ccl/utilccl", "//pkg/config", "//pkg/config/zonepb", - "//pkg/keys", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/ccl/spanconfigccl/spanconfiglimiterccl/drop_table_test.go b/pkg/ccl/spanconfigccl/spanconfiglimiterccl/drop_table_test.go index 172c73213ad0..dc9abf87f64e 100644 --- a/pkg/ccl/spanconfigccl/spanconfiglimiterccl/drop_table_test.go +++ b/pkg/ccl/spanconfigccl/spanconfiglimiterccl/drop_table_test.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" @@ -57,14 +56,15 @@ func TestDropTableLowersSpanCount(t *testing.T) { zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.GC.TTLSeconds = 1 config.TestingSetupZoneConfigHook(tc.Stopper()) - // TODO(irfansharif): Work around for #75864. - config.TestingSetZoneConfig(keys.TenantsRangesID, zoneConfig) require.NoError(t, err) defer func() { require.NoError(t, tenantSQLDB.Close()) }() tenantDB := sqlutils.MakeSQLRunner(tenantSQLDB) + tenantDB.Exec(t, `CREATE TABLE t(k INT PRIMARY KEY)`) + id := sqlutils.QueryTableID(t, tenantSQLDB, "defaultdb", "public", "t") + config.TestingSetZoneConfig(config.ObjectID(id), zoneConfig) var spanCount int tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount) diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index dc81b6404a1d..4c52665d963d 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -15,6 +15,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/config", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/keys", "//pkg/roachpb", diff --git a/pkg/config/keys.go b/pkg/config/keys.go index 4c3a78eb21c4..f70d56bdc395 100644 --- a/pkg/config/keys.go +++ b/pkg/config/keys.go @@ -27,11 +27,11 @@ func MakeZoneKey(codec keys.SQLCodec, id descpb.ID) roachpb.Key { return codec.ZoneKey(uint32(id)) } -// DecodeSystemTenantObjectID decodes the object ID for the system-tenant from +// DecodeObjectID decodes the object ID for the system-tenant from // the front of key. It returns the decoded object ID, the remainder of the key, // and whether the result is valid (i.e., whether the key was within the system // tenant's structured key space). -func DecodeSystemTenantObjectID(key roachpb.RKey) (SystemTenantObjectID, []byte, bool) { - rem, id, err := keys.SystemSQLCodec.DecodeTablePrefix(key.AsRawKey()) - return SystemTenantObjectID(id), rem, err == nil +func DecodeObjectID(codec keys.SQLCodec, key roachpb.RKey) (ObjectID, []byte, bool) { + rem, id, err := codec.DecodeTablePrefix(key.AsRawKey()) + return ObjectID(id), rem, err == nil } diff --git a/pkg/config/keys_test.go b/pkg/config/keys_test.go index eac7175f28f9..b24945142970 100644 --- a/pkg/config/keys_test.go +++ b/pkg/config/keys_test.go @@ -28,7 +28,7 @@ func TestDecodeSystemTenantObjectID(t *testing.T) { key roachpb.RKey keySuffix []byte success bool - id config.SystemTenantObjectID + id config.ObjectID }{ // Before the structured span. {roachpb.RKeyMin, nil, false, 0}, @@ -43,7 +43,7 @@ func TestDecodeSystemTenantObjectID(t *testing.T) { } for tcNum, tc := range testCases { - id, keySuffix, success := config.DecodeSystemTenantObjectID(tc.key) + id, keySuffix, success := config.DecodeObjectID(keys.SystemSQLCodec, tc.key) if success != tc.success { t.Errorf("#%d: expected success=%t", tcNum, tc.success) continue diff --git a/pkg/config/system.go b/pkg/config/system.go index 007fd8365890..2a38770400f7 100644 --- a/pkg/config/system.go +++ b/pkg/config/system.go @@ -16,6 +16,7 @@ import ( "fmt" "sort" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -26,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) -// SystemTenantObjectID is an identifier for an object (e.g. database or table) +// ObjectID is an identifier for an object (e.g. database or table) // in the system tenant. Each object in the system tenant is capable of being // associated with a zone configuration, which describes how and where the // object's data is stored in KV. Some objects in the system tenant also serve @@ -39,10 +40,10 @@ import ( // the system tenant. Additionally, individual objects in secondary tenants do // not serve as Range split boundaries. However, each tenant is guaranteed to be // split off into its own range. -type SystemTenantObjectID uint32 +type ObjectID uint32 type zoneConfigHook func( - sysCfg *SystemConfig, objectID SystemTenantObjectID, + sysCfg *SystemConfig, codec keys.SQLCodec, objectID ObjectID, ) (zone *zonepb.ZoneConfig, placeholder *zonepb.ZoneConfig, cache bool, err error) var ( @@ -53,7 +54,7 @@ var ( // testingLargestIDHook is a function used to bypass GetLargestObjectID // in tests. - testingLargestIDHook func(maxID SystemTenantObjectID) SystemTenantObjectID + testingLargestIDHook func(maxID ObjectID) ObjectID ) type zoneEntry struct { @@ -85,8 +86,8 @@ type SystemConfig struct { DefaultZoneConfig *zonepb.ZoneConfig mu struct { syncutil.RWMutex - zoneCache map[SystemTenantObjectID]zoneEntry - shouldSplitCache map[SystemTenantObjectID]bool + zoneCache map[ObjectID]zoneEntry + shouldSplitCache map[ObjectID]bool } } @@ -94,8 +95,8 @@ type SystemConfig struct { func NewSystemConfig(defaultZoneConfig *zonepb.ZoneConfig) *SystemConfig { sc := &SystemConfig{} sc.DefaultZoneConfig = defaultZoneConfig - sc.mu.zoneCache = map[SystemTenantObjectID]zoneEntry{} - sc.mu.shouldSplitCache = map[SystemTenantObjectID]bool{} + sc.mu.zoneCache = map[ObjectID]zoneEntry{} + sc.mu.shouldSplitCache = map[ObjectID]bool{} return sc } @@ -138,7 +139,7 @@ func (s *SystemConfig) getSystemTenantDesc(key roachpb.Key) *roachpb.Value { } testingLock.Lock() - _, ok := testingZoneConfig[SystemTenantObjectID(id)] + _, ok := testingZoneConfig[ObjectID(id)] testingLock.Unlock() if ok { @@ -199,8 +200,8 @@ func (s *SystemConfig) getIndexBound(key roachpb.Key) int { // IDs. If idChecker is nil, returns the largest ID in the config // (again, augmented by the pseudo IDs). func (s *SystemConfig) GetLargestObjectID( - maxReservedDescID SystemTenantObjectID, pseudoIDs []uint32, -) (SystemTenantObjectID, error) { + maxReservedDescID ObjectID, pseudoIDs []uint32, +) (ObjectID, error) { testingLock.Lock() hook := testingLargestIDHook testingLock.Unlock() @@ -220,9 +221,9 @@ func (s *SystemConfig) GetLargestObjectID( } // Determine the largest pseudo table ID equal to or below maxID. - maxPseudoID := SystemTenantObjectID(0) + maxPseudoID := ObjectID(0) for _, id := range pseudoIDs { - objID := SystemTenantObjectID(id) + objID := ObjectID(id) if objID > maxPseudoID && (maxReservedDescID == 0 || objID <= maxReservedDescID) { maxPseudoID = objID } @@ -235,7 +236,7 @@ func (s *SystemConfig) GetLargestObjectID( if err != nil { return 0, err } - objID := SystemTenantObjectID(id) + objID := ObjectID(id) if objID < maxPseudoID { objID = maxPseudoID } @@ -267,7 +268,7 @@ func (s *SystemConfig) GetLargestObjectID( return 0, err } if id <= uint32(maxReservedDescID) { - return SystemTenantObjectID(id), nil + return ObjectID(id), nil } } @@ -280,30 +281,61 @@ func (s *SystemConfig) GetLargestObjectID( if err != nil { return 0, err } - objID := SystemTenantObjectID(id) + objID := ObjectID(id) if objID < maxPseudoID { objID = maxPseudoID } return objID, nil } -// GetZoneConfigForKey looks up the zone config for the object (table +// TestingGetSystemTenantZoneConfigForKey looks up the zone config the +// provided key. This is exposed to facilitate testing the underlying +// logic. +func TestingGetSystemTenantZoneConfigForKey( + s *SystemConfig, key roachpb.RKey, +) (ObjectID, *zonepb.ZoneConfig, error) { + return s.getZoneConfigForKey(keys.SystemSQLCodec, key) +} + +// getZoneConfigForKey looks up the zone config for the object (table // or database, specified by key.id). It is the caller's // responsibility to ensure that the range does not need to be split. -func (s *SystemConfig) GetZoneConfigForKey( - key roachpb.RKey, -) (SystemTenantObjectID, *zonepb.ZoneConfig, error) { - id, suffix := DecodeKeyIntoZoneIDAndSuffix(key) - zoneCfg, err := s.getZoneConfigForKey(id, suffix) - return id, zoneCfg, err +func (s *SystemConfig) getZoneConfigForKey( + codec keys.SQLCodec, key roachpb.RKey, +) (ObjectID, *zonepb.ZoneConfig, error) { + id, suffix := DecodeKeyIntoZoneIDAndSuffix(codec, key) + entry, err := s.getZoneEntry(codec, id) + if err != nil { + return 0, nil, err + } + if entry.zone != nil { + if entry.placeholder != nil { + if subzone, _ := entry.placeholder.GetSubzoneForKeySuffix(suffix); subzone != nil { + if indexSubzone := entry.placeholder.GetSubzone(subzone.IndexID, ""); indexSubzone != nil { + subzone.Config.InheritFromParent(&indexSubzone.Config) + } + subzone.Config.InheritFromParent(entry.zone) + return id, &subzone.Config, nil + } + } else if subzone, _ := entry.zone.GetSubzoneForKeySuffix(suffix); subzone != nil { + if indexSubzone := entry.zone.GetSubzone(subzone.IndexID, ""); indexSubzone != nil { + subzone.Config.InheritFromParent(&indexSubzone.Config) + } + subzone.Config.InheritFromParent(entry.zone) + return id, &subzone.Config, nil + } + return id, entry.zone, nil + } + return id, s.DefaultZoneConfig, nil } // GetSpanConfigForKey looks of the span config for the given key. It's part of -// spanconfig.StoreReader interface. +// spanconfig.StoreReader interface. Note that it is only usable for the system +// tenant config. func (s *SystemConfig) GetSpanConfigForKey( ctx context.Context, key roachpb.RKey, ) (roachpb.SpanConfig, error) { - id, zone, err := s.GetZoneConfigForKey(key) + id, zone, err := s.getZoneConfigForKey(keys.SystemSQLCodec, key) if err != nil { return roachpb.SpanConfig{}, err } @@ -322,8 +354,10 @@ func (s *SystemConfig) GetSpanConfigForKey( } // DecodeKeyIntoZoneIDAndSuffix figures out the zone that the key belongs to. -func DecodeKeyIntoZoneIDAndSuffix(key roachpb.RKey) (id SystemTenantObjectID, keySuffix []byte) { - objectID, keySuffix, ok := DecodeSystemTenantObjectID(key) +func DecodeKeyIntoZoneIDAndSuffix( + codec keys.SQLCodec, key roachpb.RKey, +) (id ObjectID, keySuffix []byte) { + objectID, keySuffix, ok := DecodeObjectID(codec, key) if !ok { // Not in the structured data namespace. objectID = keys.RootNamespaceID @@ -367,18 +401,31 @@ func isPseudoTableID(id uint32) bool { // GetZoneConfigForObject returns the combined zone config for the given object // identifier and SQL codec. +// // NOTE: any subzones from the zone placeholder will be automatically merged // into the cached zone so the caller doesn't need special-case handling code. func (s *SystemConfig) GetZoneConfigForObject( - codec keys.SQLCodec, id uint32, + codec keys.SQLCodec, version clusterversion.ClusterVersion, id ObjectID, ) (*zonepb.ZoneConfig, error) { - var sysID SystemTenantObjectID - if codec.ForSystemTenant() { - sysID = SystemTenantObjectID(id) - } else { - sysID = keys.TenantsRangesID - } - entry, err := s.getZoneEntry(sysID) + var entry zoneEntry + var err error + // In the case that we've not yet ensured reconciliation of the span + // configurations, use the host-provided view of the RANGE tenants + // configuration. + // + // TODO(ajwerner,arulajmani): If the reconciliation protocol is not active, + // and this is a secondary tenant object we're trying to look up, we're in a + // bit of a pickle. This assumes that if we're in the appropriate version, + // then so too is the system tenant and things are reconciled. Is it possible + // that neither of these object IDs represent reality? It seems like after + // the host cluster has been upgraded but the tenants have not, that we're + // in a weird intermediate state whereby the system tenant's config is no + // longer respected, but neither is the secondary tenant's. + if !codec.ForSystemTenant() && + (id == 0 || !version.IsActive(clusterversion.EnableSpanConfigStore)) { + codec, id = keys.SystemSQLCodec, keys.TenantsRangesID + } + entry, err = s.getZoneEntry(codec, id) if err != nil { return nil, err } @@ -390,7 +437,7 @@ func (s *SystemConfig) GetZoneConfigForObject( // directly returned. Otherwise, getZoneEntry will hydrate new // zonepb.ZoneConfig(s) from the SystemConfig and install them as an // entry in the cache. -func (s *SystemConfig) getZoneEntry(id SystemTenantObjectID) (zoneEntry, error) { +func (s *SystemConfig) getZoneEntry(codec keys.SQLCodec, id ObjectID) (zoneEntry, error) { s.mu.RLock() entry, ok := s.mu.zoneCache[id] s.mu.RUnlock() @@ -400,7 +447,7 @@ func (s *SystemConfig) getZoneEntry(id SystemTenantObjectID) (zoneEntry, error) testingLock.Lock() hook := ZoneConfigHook testingLock.Unlock() - zone, placeholder, cache, err := hook(s, id) + zone, placeholder, cache, err := hook(s, codec, id) if err != nil { return zoneEntry{}, err } @@ -425,34 +472,6 @@ func (s *SystemConfig) getZoneEntry(id SystemTenantObjectID) (zoneEntry, error) return zoneEntry{}, nil } -func (s *SystemConfig) getZoneConfigForKey( - id SystemTenantObjectID, keySuffix []byte, -) (*zonepb.ZoneConfig, error) { - entry, err := s.getZoneEntry(id) - if err != nil { - return nil, err - } - if entry.zone != nil { - if entry.placeholder != nil { - if subzone, _ := entry.placeholder.GetSubzoneForKeySuffix(keySuffix); subzone != nil { - if indexSubzone := entry.placeholder.GetSubzone(subzone.IndexID, ""); indexSubzone != nil { - subzone.Config.InheritFromParent(&indexSubzone.Config) - } - subzone.Config.InheritFromParent(entry.zone) - return &subzone.Config, nil - } - } else if subzone, _ := entry.zone.GetSubzoneForKeySuffix(keySuffix); subzone != nil { - if indexSubzone := entry.zone.GetSubzone(subzone.IndexID, ""); indexSubzone != nil { - subzone.Config.InheritFromParent(&indexSubzone.Config) - } - subzone.Config.InheritFromParent(entry.zone) - return &subzone.Config, nil - } - return entry.zone, nil - } - return s.DefaultZoneConfig, nil -} - var staticSplits = []roachpb.RKey{ roachpb.RKey(keys.NodeLivenessPrefix), // end of meta records / start of node liveness span roachpb.RKey(keys.NodeLivenessKeyMax), // end of node liveness span @@ -535,7 +554,7 @@ func (s *SystemConfig) systemTenantTableBoundarySplitKey( return nil } - startID, _, ok := DecodeSystemTenantObjectID(startKey) + startID, _, ok := DecodeObjectID(keys.SystemSQLCodec, startKey) if !ok || startID <= keys.MaxSystemConfigDescID { // The start key is either: // - not part of the structured data span @@ -552,7 +571,7 @@ func (s *SystemConfig) systemTenantTableBoundarySplitKey( // findSplitKey returns the first possible split key between the given // range of IDs. - findSplitKey := func(startID, endID SystemTenantObjectID) roachpb.RKey { + findSplitKey := func(startID, endID ObjectID) roachpb.RKey { // endID could be smaller than startID if we don't have user tables. for id := startID; id <= endID; id++ { tableKey := roachpb.RKey(keys.SystemSQLCodec.TablePrefix(uint32(id))) @@ -600,7 +619,7 @@ func (s *SystemConfig) systemTenantTableBoundarySplitKey( if splitKey := findSplitKey(startID, endID); splitKey != nil { return splitKey } - startID = SystemTenantObjectID(keys.MaxReservedDescID + 1) + startID = ObjectID(keys.MaxReservedDescID + 1) } // Find the split key in the system tenant's user space. @@ -706,7 +725,7 @@ func (s *SystemConfig) NeedsSplit(ctx context.Context, startKey, endKey roachpb. // shouldSplitOnSystemTenantObject checks if the ID is eligible for a split at // all. It uses the internal cache to find a value, and tries to find it using // the hook if ID isn't found in the cache. -func (s *SystemConfig) shouldSplitOnSystemTenantObject(id SystemTenantObjectID) bool { +func (s *SystemConfig) shouldSplitOnSystemTenantObject(id ObjectID) bool { // Check the cache. { s.mu.RLock() diff --git a/pkg/config/system_test.go b/pkg/config/system_test.go index c7340dc592d1..f5a06a3eb513 100644 --- a/pkg/config/system_test.go +++ b/pkg/config/system_test.go @@ -152,8 +152,8 @@ func TestGetLargestID(t *testing.T) { type testCase struct { values []roachpb.KeyValue - largest config.SystemTenantObjectID - maxID config.SystemTenantObjectID + largest config.ObjectID + maxID config.ObjectID pseudoIDs []uint32 errStr string } @@ -208,12 +208,12 @@ func TestGetLargestID(t *testing.T) { func() testCase { ms := bootstrap.MakeMetadataSchema(keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef()) descIDs := ms.DescriptorIDs() - maxDescID := config.SystemTenantObjectID(descIDs[len(descIDs)-1]) + maxDescID := config.ObjectID(descIDs[len(descIDs)-1]) kvs, _ /* splits */ := ms.GetInitialValues() pseudoIDs := keys.PseudoTableIDs const pseudoIDIsMax = false // NOTE: change to false if adding new system not pseudo objects. if pseudoIDIsMax { - maxDescID = config.SystemTenantObjectID(keys.MaxPseudoTableID) + maxDescID = config.ObjectID(keys.MaxPseudoTableID) } return testCase{kvs, maxDescID, 0, pseudoIDs, ""} }(), @@ -547,7 +547,7 @@ func TestGetZoneConfigForKey(t *testing.T) { ctx := context.Background() testCases := []struct { key roachpb.RKey - expectedID config.SystemTenantObjectID + expectedID config.ObjectID }{ {roachpb.RKeyMin, keys.MetaRangesID}, {roachpb.RKey(keys.Meta1Prefix), keys.MetaRangesID}, @@ -575,7 +575,7 @@ func TestGetZoneConfigForKey(t *testing.T) { // Non-gossiped system tables should refer to themselves. {tkey(keys.LeaseTableID), keys.LeaseTableID}, - {tkey(uint32(systemschema.JobsTable.GetID())), config.SystemTenantObjectID(systemschema.JobsTable.GetID())}, + {tkey(uint32(systemschema.JobsTable.GetID())), config.ObjectID(systemschema.JobsTable.GetID())}, {tkey(keys.LocationsTableID), keys.LocationsTableID}, {tkey(keys.NamespaceTableID), keys.NamespaceTableID}, @@ -586,8 +586,8 @@ func TestGetZoneConfigForKey(t *testing.T) { {tkey(keys.LivenessRangesID), keys.SystemDatabaseID}, // User tables should refer to themselves. - {tkey(bootstrap.TestingUserDescID(0)), config.SystemTenantObjectID(bootstrap.TestingUserDescID(0))}, - {tkey(bootstrap.TestingUserDescID(22)), config.SystemTenantObjectID(bootstrap.TestingUserDescID(22))}, + {tkey(bootstrap.TestingUserDescID(0)), config.ObjectID(bootstrap.TestingUserDescID(0))}, + {tkey(bootstrap.TestingUserDescID(22)), config.ObjectID(bootstrap.TestingUserDescID(22))}, {roachpb.RKeyMax, keys.RootNamespaceID}, // Secondary tenant tables should refer to the TenantsRangesID. @@ -610,9 +610,9 @@ func TestGetZoneConfigForKey(t *testing.T) { Values: kvs, } for tcNum, tc := range testCases { - var objectID config.SystemTenantObjectID + var objectID config.ObjectID config.ZoneConfigHook = func( - _ *config.SystemConfig, id config.SystemTenantObjectID, + _ *config.SystemConfig, codec keys.SQLCodec, id config.ObjectID, ) (*zonepb.ZoneConfig, *zonepb.ZoneConfig, bool, error) { objectID = id return cfg.DefaultZoneConfig, nil, false, nil @@ -622,7 +622,7 @@ func TestGetZoneConfigForKey(t *testing.T) { t.Errorf("#%d: GetSpanConfigForKey(%v) got error: %v", tcNum, tc.key, err) } if objectID != tc.expectedID { - t.Errorf("#%d: GetZoneConfigForKey(%v) got %d; want %d", tcNum, tc.key, objectID, tc.expectedID) + t.Errorf("#%d: GetSpanConfigForKey(%v) got %d; want %d", tcNum, tc.key, objectID, tc.expectedID) } } } diff --git a/pkg/config/testutil.go b/pkg/config/testutil.go index 640e35d21f3e..8c17fb4f32c3 100644 --- a/pkg/config/testutil.go +++ b/pkg/config/testutil.go @@ -12,11 +12,12 @@ package config import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) -type zoneConfigMap map[SystemTenantObjectID]zonepb.ZoneConfig +type zoneConfigMap map[ObjectID]zonepb.ZoneConfig var ( testingZoneConfig zoneConfigMap @@ -40,7 +41,7 @@ func TestingSetupZoneConfigHook(stopper *stop.Stopper) { testingZoneConfig = make(zoneConfigMap) testingPreviousHook = ZoneConfigHook ZoneConfigHook = testingZoneConfigHook - testingLargestIDHook = func(maxID SystemTenantObjectID) (max SystemTenantObjectID) { + testingLargestIDHook = func(maxID ObjectID) (max ObjectID) { testingLock.Lock() defer testingLock.Unlock() for id := range testingZoneConfig { @@ -70,14 +71,14 @@ func testingResetZoneConfigHook() { // TestingSetZoneConfig sets the zone config entry for object 'id' // in the testing map. -func TestingSetZoneConfig(id SystemTenantObjectID, zone zonepb.ZoneConfig) { +func TestingSetZoneConfig(id ObjectID, zone zonepb.ZoneConfig) { testingLock.Lock() defer testingLock.Unlock() testingZoneConfig[id] = zone } func testingZoneConfigHook( - _ *SystemConfig, id SystemTenantObjectID, + _ *SystemConfig, codec keys.SQLCodec, id ObjectID, ) (*zonepb.ZoneConfig, *zonepb.ZoneConfig, bool, error) { testingLock.Lock() defer testingLock.Unlock() diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index 3db120cc5140..9b9ff8570df0 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -76,6 +76,12 @@ type Connector interface { // OverridesMonitor provides access to tenant cluster setting overrides. settingswatcher.OverridesMonitor + + // SystemConfigProvider provides access to basic host-tenant controlled + // information regarding tenant zone configs. This is critical for the + // mixed version 21.2->22.1 state where the tenant has not yet configured + // its own zones. + config.SystemConfigProvider } // TokenBucketProvider supplies an endpoint (to tenants) for the TokenBucket API diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 8f6a1c331a52..bce80d549db1 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -3542,7 +3542,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { // Set global reads. zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.GlobalReads = proto.Bool(true) - config.TestingSetZoneConfig(config.SystemTenantObjectID(descID), zoneConfig) + config.TestingSetZoneConfig(config.ObjectID(descID), zoneConfig) // Perform a write to the system config span being watched by // the SystemConfigProvider. diff --git a/pkg/kv/kvserver/merge_queue_test.go b/pkg/kv/kvserver/merge_queue_test.go index 3bd836d77805..0c787317f9bf 100644 --- a/pkg/kv/kvserver/merge_queue_test.go +++ b/pkg/kv/kvserver/merge_queue_test.go @@ -47,8 +47,8 @@ func TestMergeQueueShouldQueue(t *testing.T) { return keys.SystemSQLCodec.TablePrefix(bootstrap.TestingUserDescID(offset)) } - config.TestingSetZoneConfig(config.SystemTenantObjectID(bootstrap.TestingUserDescID(0)), *zonepb.NewZoneConfig()) - config.TestingSetZoneConfig(config.SystemTenantObjectID(bootstrap.TestingUserDescID(1)), *zonepb.NewZoneConfig()) + config.TestingSetZoneConfig(config.ObjectID(bootstrap.TestingUserDescID(0)), *zonepb.NewZoneConfig()) + config.TestingSetZoneConfig(config.ObjectID(bootstrap.TestingUserDescID(1)), *zonepb.NewZoneConfig()) type testCase struct { startKey, endKey []byte diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index cfa3c7c409c7..c44bfa15bd4f 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -732,7 +732,7 @@ func TestAcceptsUnsplitRanges(t *testing.T) { // which means keys.MaxReservedDescID+1. zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.RangeMaxBytes = proto.Int64(1 << 20) - config.TestingSetZoneConfig(config.SystemTenantObjectID(bootstrap.TestingUserDescID(1)), zoneConfig) + config.TestingSetZoneConfig(config.ObjectID(bootstrap.TestingUserDescID(1)), zoneConfig) // Check our config. neverSplitsDesc = neverSplits.Desc() diff --git a/pkg/kv/kvserver/reports/constraint_stats_report.go b/pkg/kv/kvserver/reports/constraint_stats_report.go index 3db4134fed20..a0a5318cb7f0 100644 --- a/pkg/kv/kvserver/reports/constraint_stats_report.go +++ b/pkg/kv/kvserver/reports/constraint_stats_report.go @@ -185,7 +185,7 @@ func (r *replicationConstraintStatsReportSaver) loadPreviousVersion( for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { row := it.Cur() key := ConstraintStatusKey{} - key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) + key.ZoneID = (config.ObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID((*row[1].(*tree.DInt))) key.ViolationType = (ConstraintType)(*row[2].(*tree.DString)) key.Constraint = (ConstraintRepr)(*row[3].(*tree.DString)) @@ -396,7 +396,7 @@ func (v *constraintConformanceVisitor) reset(ctx context.Context) { if err != nil { log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err) } - for i := config.SystemTenantObjectID(1); i <= maxObjectID; i++ { + for i := config.ObjectID(1); i <= maxObjectID; i++ { zone, err := getZoneByID(i, v.cfg) if err != nil { log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err) diff --git a/pkg/kv/kvserver/reports/constraint_stats_report_test.go b/pkg/kv/kvserver/reports/constraint_stats_report_test.go index 7cab42845c0c..550c1b06fb3b 100644 --- a/pkg/kv/kvserver/reports/constraint_stats_report_test.go +++ b/pkg/kv/kvserver/reports/constraint_stats_report_test.go @@ -1117,7 +1117,7 @@ func (b *systemConfigBuilder) addZoneInner(objectName string, id int, cfg zonepb panic(err) } b.kv = append(b.kv, roachpb.KeyValue{Key: k, Value: v}) - return b.addZoneToObjectMapping(MakeZoneKey(config.SystemTenantObjectID(id), NoSubzone), objectName) + return b.addZoneToObjectMapping(MakeZoneKey(config.ObjectID(id), NoSubzone), objectName) } func (b *systemConfigBuilder) addDatabaseZone(name string, id int, cfg zonepb.ZoneConfig) error { @@ -1144,7 +1144,7 @@ func (b *systemConfigBuilder) addTableZone(t descpb.TableDescriptor, cfg zonepb. object = fmt.Sprintf("%s.%s", idx, subzone.PartitionName) } if err := b.addZoneToObjectMapping( - MakeZoneKey(config.SystemTenantObjectID(t.ID), base.SubzoneIDFromIndex(i)), object, + MakeZoneKey(config.ObjectID(t.ID), base.SubzoneIDFromIndex(i)), object, ); err != nil { return err } diff --git a/pkg/kv/kvserver/reports/critical_localities_report.go b/pkg/kv/kvserver/reports/critical_localities_report.go index 6ca36535ee09..58bf6bab0795 100644 --- a/pkg/kv/kvserver/reports/critical_localities_report.go +++ b/pkg/kv/kvserver/reports/critical_localities_report.go @@ -115,7 +115,7 @@ func (r *replicationCriticalLocalitiesReportSaver) loadPreviousVersion( for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { row := it.Cur() key := localityKey{} - key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) + key.ZoneID = (config.ObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID(*row[1].(*tree.DInt)) key.locality = (LocalityRepr)(*row[2].(*tree.DString)) r.previousVersion[key] = localityStatus{(int32)(*row[3].(*tree.DInt))} diff --git a/pkg/kv/kvserver/reports/replication_stats_report.go b/pkg/kv/kvserver/reports/replication_stats_report.go index 71ebe6088f18..6644448f1065 100644 --- a/pkg/kv/kvserver/reports/replication_stats_report.go +++ b/pkg/kv/kvserver/reports/replication_stats_report.go @@ -121,7 +121,7 @@ func (r *replicationStatsReportSaver) loadPreviousVersion( for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { row := it.Cur() key := ZoneKey{} - key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) + key.ZoneID = (config.ObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID(*row[1].(*tree.DInt)) r.previousVersion[key] = zoneRangeStatus{ (int32)(*row[2].(*tree.DInt)), @@ -304,7 +304,7 @@ func (v *replicationStatsVisitor) reset(ctx context.Context) { if err != nil { log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err) } - for i := config.SystemTenantObjectID(1); i <= maxObjectID; i++ { + for i := config.ObjectID(1); i <= maxObjectID; i++ { zone, err := getZoneByID(i, v.cfg) if err != nil { log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err) diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 5c944a175248..eeafc4887b1a 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -294,7 +294,7 @@ type nodeChecker func(nodeID roachpb.NodeID) bool type zoneResolver struct { init bool // curObjectID is the object (i.e. usually table) of the configured range. - curObjectID config.SystemTenantObjectID + curObjectID config.ObjectID // curRootZone is the lowest zone convering the previously resolved range // that's not a subzone. // This is used to compute the subzone for a range. @@ -316,9 +316,7 @@ func (c *zoneResolver) resolveRange( // setZone remembers the passed-in info as the reference for further // checkSameZone() calls. // Clients should generally use the higher-level updateZone(). -func (c *zoneResolver) setZone( - objectID config.SystemTenantObjectID, key ZoneKey, rootZone *zonepb.ZoneConfig, -) { +func (c *zoneResolver) setZone(objectID config.ObjectID, key ZoneKey, rootZone *zonepb.ZoneConfig) { c.init = true c.curObjectID = objectID c.curRootZone = rootZone @@ -330,7 +328,7 @@ func (c *zoneResolver) setZone( func (c *zoneResolver) updateZone( ctx context.Context, rd *roachpb.RangeDescriptor, cfg *config.SystemConfig, ) (ZoneKey, error) { - objectID, _ := config.DecodeKeyIntoZoneIDAndSuffix(rd.StartKey) + objectID, _ := config.DecodeKeyIntoZoneIDAndSuffix(keys.SystemSQLCodec, rd.StartKey) first := true var zoneKey ZoneKey var rootZone *zonepb.ZoneConfig @@ -374,7 +372,7 @@ func (c *zoneResolver) checkSameZone(ctx context.Context, rng *roachpb.RangeDesc return false } - objectID, keySuffix := config.DecodeKeyIntoZoneIDAndSuffix(rng.StartKey) + objectID, keySuffix := config.DecodeKeyIntoZoneIDAndSuffix(keys.SystemSQLCodec, rng.StartKey) if objectID != c.curObjectID { return false } @@ -405,7 +403,7 @@ func visitZones( opt visitOpt, visitor func(context.Context, *zonepb.ZoneConfig, ZoneKey) bool, ) (bool, error) { - id, keySuffix := config.DecodeKeyIntoZoneIDAndSuffix(rng.StartKey) + id, keySuffix := config.DecodeKeyIntoZoneIDAndSuffix(keys.SystemSQLCodec, rng.StartKey) zone, err := getZoneByID(id, cfg) if err != nil { return false, err @@ -441,7 +439,7 @@ func visitZones( // corresponding to id. The zone corresponding to id itself is not visited. func visitAncestors( ctx context.Context, - id config.SystemTenantObjectID, + id config.ObjectID, cfg *config.SystemConfig, visitor func(context.Context, *zonepb.ZoneConfig, ZoneKey) bool, ) (bool, error) { @@ -467,12 +465,12 @@ func visitAncestors( } // If it's a table, the parent is a database. - zone, err := getZoneByID(config.SystemTenantObjectID(tableDesc.ParentID), cfg) + zone, err := getZoneByID(config.ObjectID(tableDesc.ParentID), cfg) if err != nil { return false, err } if zone != nil { - if visitor(ctx, zone, MakeZoneKey(config.SystemTenantObjectID(tableDesc.ParentID), NoSubzone)) { + if visitor(ctx, zone, MakeZoneKey(config.ObjectID(tableDesc.ParentID), NoSubzone)) { return true, nil } } @@ -496,9 +494,7 @@ func visitDefaultZone( } // getZoneByID returns a zone given its id. Inheritance does not apply. -func getZoneByID( - id config.SystemTenantObjectID, cfg *config.SystemConfig, -) (*zonepb.ZoneConfig, error) { +func getZoneByID(id config.ObjectID, cfg *config.SystemConfig) (*zonepb.ZoneConfig, error) { zoneVal := cfg.GetValue(config.MakeZoneKey(keys.SystemSQLCodec, descpb.ID(id))) if zoneVal == nil { return nil, nil diff --git a/pkg/kv/kvserver/reports/reporter_test.go b/pkg/kv/kvserver/reports/reporter_test.go index bbf3d0bf50da..506d9a44c077 100644 --- a/pkg/kv/kvserver/reports/reporter_test.go +++ b/pkg/kv/kvserver/reports/reporter_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -516,9 +517,9 @@ func TestZoneChecker(t *testing.T) { p2SubzoneIndex := 1 require.Equal(t, "p1", t1Zone.Subzones[p1SubzoneIndex].PartitionName) require.Equal(t, "p2", t1Zone.Subzones[p2SubzoneIndex].PartitionName) - t1ZoneKey := MakeZoneKey(config.SystemTenantObjectID(t1ID), NoSubzone) - p1ZoneKey := MakeZoneKey(config.SystemTenantObjectID(t1ID), base.SubzoneIDFromIndex(p1SubzoneIndex)) - p2ZoneKey := MakeZoneKey(config.SystemTenantObjectID(t1ID), base.SubzoneIDFromIndex(p2SubzoneIndex)) + t1ZoneKey := MakeZoneKey(config.ObjectID(t1ID), NoSubzone) + p1ZoneKey := MakeZoneKey(config.ObjectID(t1ID), base.SubzoneIDFromIndex(p1SubzoneIndex)) + p2ZoneKey := MakeZoneKey(config.ObjectID(t1ID), base.SubzoneIDFromIndex(p2SubzoneIndex)) ranges := []tc{ { @@ -573,7 +574,7 @@ func TestZoneChecker(t *testing.T) { newZone := !sameZone require.Equal(t, tc.newZone, newZone, "failed at: %d (%s)", i, tc.split) if newZone { - objectID, _ := config.DecodeKeyIntoZoneIDAndSuffix(rngs[i].StartKey) + objectID, _ := config.DecodeKeyIntoZoneIDAndSuffix(keys.SystemSQLCodec, rngs[i].StartKey) zc.setZone(objectID, tc.newZoneKey, tc.newRootZoneCfg) } } diff --git a/pkg/kv/kvserver/reports/zone_key.go b/pkg/kv/kvserver/reports/zone_key.go index ebf5e5865e8b..0fed8e117946 100644 --- a/pkg/kv/kvserver/reports/zone_key.go +++ b/pkg/kv/kvserver/reports/zone_key.go @@ -20,7 +20,7 @@ import ( // ZoneKey is the index of the first level in the constraint conformance report. type ZoneKey struct { // ZoneID is the id of the zone this key is referencing. - ZoneID config.SystemTenantObjectID + ZoneID config.ObjectID // SubzoneID identifies what subzone, if any, this key is referencing. The // zero value (also named NoSubzone) indicates that the key is referring to a // zone, not a subzone. @@ -35,7 +35,7 @@ const NoSubzone base.SubzoneID = 0 // // Use NoSubzone for subzoneID to indicate that the key references a zone, not a // subzone. -func MakeZoneKey(zoneID config.SystemTenantObjectID, subzoneID base.SubzoneID) ZoneKey { +func MakeZoneKey(zoneID config.ObjectID, subzoneID base.SubzoneID) ZoneKey { return ZoneKey{ ZoneID: zoneID, SubzoneID: subzoneID, diff --git a/pkg/migration/migrations/public_schema_migration_external_test.go b/pkg/migration/migrations/public_schema_migration_external_test.go index 8757d90bbc59..0c3fd8ea6621 100644 --- a/pkg/migration/migrations/public_schema_migration_external_test.go +++ b/pkg/migration/migrations/public_schema_migration_external_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/server" @@ -50,6 +51,7 @@ func publicSchemaMigrationTest(t *testing.T, ctx context.Context, numTables int) DisableAutomaticVersionUpgrade: make(chan struct{}), BinaryVersionOverride: clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors - 1), }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, }, }) diff --git a/pkg/server/systemconfigwatcher/BUILD.bazel b/pkg/server/systemconfigwatcher/BUILD.bazel index ce07402f9f16..7e427c7bf4ab 100644 --- a/pkg/server/systemconfigwatcher/BUILD.bazel +++ b/pkg/server/systemconfigwatcher/BUILD.bazel @@ -25,14 +25,24 @@ go_test( "cache_test.go", "main_test.go", ], - embed = [":systemconfigwatcher"], deps = [ + ":systemconfigwatcher", + "//pkg/base", + "//pkg/config", + "//pkg/config/zonepb", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed", + "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", "//pkg/server/systemconfigwatcher/systemconfigwatchertest", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/syncutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/systemconfigwatcher/cache.go b/pkg/server/systemconfigwatcher/cache.go index 678ff9b351c9..5990970c2fac 100644 --- a/pkg/server/systemconfigwatcher/cache.go +++ b/pkg/server/systemconfigwatcher/cache.go @@ -30,21 +30,53 @@ import ( // cache provides a consistent snapshot when available, but the snapshot // may be stale. type Cache struct { - w *rangefeedcache.Watcher - defaultZoneConfig *zonepb.ZoneConfig - mu struct { + w *rangefeedcache.Watcher + defaultZoneConfig *zonepb.ZoneConfig + additionalKVsSource config.SystemConfigProvider + mu struct { syncutil.RWMutex cfg *config.SystemConfig timestamp hlc.Timestamp - registered map[chan<- struct{}]struct{} + registry notificationRegistry + + // additionalKVs provides a mechanism for the creator of the + // cache to provide additional values. + // + // This is used to support injecting some key-value pairs from the + // system tenant into the system config. + additionalKVs []roachpb.KeyValue } } // New constructs a new Cache. func New( codec keys.SQLCodec, clock *hlc.Clock, f *rangefeed.Factory, defaultZoneConfig *zonepb.ZoneConfig, +) *Cache { + return NewWithAdditionalProvider( + codec, clock, f, defaultZoneConfig, nil, /* additionalProvider */ + ) +} + +// NewWithAdditionalProvider constructs a new Cache with the addition of +// another provider of a SystemConfig. This additional provider is used only +// for the KVs in its system config. The key-value pairs it provides should +// not overlap with those of this provider, if they do, the latest values +// will be preferred. +// +// This functionality exists to provide access to the system tenant's view +// of its zone configuration for RANGE DEFAULT and RANGE TENANTS. This is +// needed primarily in the mixed-version state before the tenant is in control +// of its own zone configurations. +// +// TODO(ajwerner): Remove this functionality once it's no longer needed in 22.2. +func NewWithAdditionalProvider( + codec keys.SQLCodec, + clock *hlc.Clock, + f *rangefeed.Factory, + defaultZoneConfig *zonepb.ZoneConfig, + additional config.SystemConfigProvider, ) *Cache { // TODO(ajwerner): Deal with what happens if the system config has more than this // many rows. @@ -53,7 +85,8 @@ func New( c := Cache{ defaultZoneConfig: defaultZoneConfig, } - c.mu.registered = make(map[chan<- struct{}]struct{}) + c.mu.registry = notificationRegistry{} + c.additionalKVsSource = additional // TODO(ajwerner): Consider stripping this down to just watching // descriptor and zones. @@ -74,7 +107,40 @@ func New( // Start starts the cache. func (c *Cache) Start(ctx context.Context, stopper *stop.Stopper) error { - return rangefeedcache.Start(ctx, stopper, c.w, nil /* onError */) + if err := rangefeedcache.Start(ctx, stopper, c.w, nil /* onError */); err != nil { + return err + } + if c.additionalKVsSource != nil { + setAdditionalKeys := func() { + if cfg := c.additionalKVsSource.GetSystemConfig(); cfg != nil { + c.setAdditionalKeys(cfg.Values) + } + } + ch, unregister := c.additionalKVsSource.RegisterSystemConfigChannel() + // Check if there are any additional keys to set before returning from + // start. This is mostly to make tests deterministic. + select { + case <-ch: + setAdditionalKeys() + default: + } + if err := stopper.RunAsyncTask(ctx, "systemconfigwatcher-additional", func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-stopper.ShouldQuiesce(): + return + case <-ch: + setAdditionalKeys() + } + } + }); err != nil { + unregister() + return err + } + } + return nil } // GetSystemConfig is part of the config.SystemConfigProvider interface. @@ -90,12 +156,15 @@ func (c *Cache) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister fun ch := make(chan struct{}, 1) c.mu.Lock() defer c.mu.Unlock() + if c.mu.cfg != nil { + ch <- struct{}{} + } - c.mu.registered[ch] = struct{}{} + c.mu.registry[ch] = struct{}{} return ch, func() { c.mu.Lock() defer c.mu.Unlock() - delete(c.mu.registered, ch) + delete(c.mu.registry, ch) } } @@ -108,6 +177,46 @@ func (c *Cache) LastUpdated() hlc.Timestamp { return c.mu.timestamp } +func (c *Cache) setAdditionalKeys(kvs []roachpb.KeyValue) { + c.mu.Lock() + defer c.mu.Unlock() + + sort.Sort(keyValues(kvs)) + if c.mu.cfg == nil { + c.mu.additionalKVs = kvs + return + } + + cloned := append([]roachpb.KeyValue(nil), c.mu.cfg.Values...) + trimmed := append(trimOldKVs(cloned, c.mu.additionalKVs), kvs...) + sort.Sort(keyValues(trimmed)) + c.mu.cfg = config.NewSystemConfig(c.defaultZoneConfig) + c.mu.cfg.Values = trimmed + c.mu.additionalKVs = kvs + c.mu.registry.notify() +} + +// trimOldKVs removes KVs from cloned where for all keys in prev. +// This function assumes that both cloned and prev are sorted. +func trimOldKVs(cloned, prev []roachpb.KeyValue) []roachpb.KeyValue { + trimmed := cloned[:0] + shouldSkip := func(clonedOrd int) (shouldSkip bool) { + for len(prev) > 0 { + if cmp := prev[0].Key.Compare(cloned[clonedOrd].Key); cmp >= 0 { + return cmp == 0 + } + prev = prev[1:] + } + return false + } + for i := range cloned { + if !shouldSkip(i) { + trimmed = append(trimmed, cloned[i]) + } + } + return trimmed +} + type keyValues []roachpb.KeyValue func (k keyValues) Len() int { return len(k) } @@ -119,21 +228,22 @@ var _ sort.Interface = (keyValues)(nil) func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { updateKVs := rangefeedbuffer.EventsToKVs(update.Events, rangefeedbuffer.RangeFeedValueEventToKV) + c.mu.Lock() + defer c.mu.Unlock() var updatedData []roachpb.KeyValue switch update.Type { case rangefeedcache.CompleteUpdate: - sort.Sort(keyValues(updateKVs)) - updatedData = updateKVs + updatedData = rangefeedbuffer.MergeKVs(c.mu.additionalKVs, updateKVs) case rangefeedcache.IncrementalUpdate: // Note that handleUpdate is called synchronously, so we can use the // old snapshot as the basis for the new snapshot without any risk of // missing anything. - prev := c.GetSystemConfig() + prev := c.mu.cfg // If there is nothing interesting, just update the timestamp and // return without notifying anybody. if len(updateKVs) == 0 { - c.setUpdatedConfig(prev, update.Timestamp) + c.setUpdatedConfigLocked(prev, update.Timestamp) return } updatedData = rangefeedbuffer.MergeKVs(prev.Values, updateKVs) @@ -141,23 +251,15 @@ func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { updatedCfg := config.NewSystemConfig(c.defaultZoneConfig) updatedCfg.Values = updatedData - c.setUpdatedConfig(updatedCfg, update.Timestamp) + c.setUpdatedConfigLocked(updatedCfg, update.Timestamp) } -func (c *Cache) setUpdatedConfig(updated *config.SystemConfig, ts hlc.Timestamp) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Cache) setUpdatedConfigLocked(updated *config.SystemConfig, ts hlc.Timestamp) { changed := c.mu.cfg != updated c.mu.cfg = updated c.mu.timestamp = ts - if !changed { - return - } - for ch := range c.mu.registered { - select { - case ch <- struct{}{}: - default: - } + if changed { + c.mu.registry.notify() } } @@ -168,3 +270,14 @@ func passThroughTranslation( } var _ config.SystemConfigProvider = (*Cache)(nil) + +type notificationRegistry map[chan<- struct{}]struct{} + +func (nr notificationRegistry) notify() { + for ch := range nr { + select { + case ch <- struct{}{}: + default: + } + } +} diff --git a/pkg/server/systemconfigwatcher/cache_test.go b/pkg/server/systemconfigwatcher/cache_test.go index 3b985aab2be6..d7c5181dccc0 100644 --- a/pkg/server/systemconfigwatcher/cache_test.go +++ b/pkg/server/systemconfigwatcher/cache_test.go @@ -8,16 +8,128 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package systemconfigwatcher +package systemconfigwatcher_test import ( + "context" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" ) func TestCache(t *testing.T) { defer leaktest.AfterTest(t)() systemconfigwatchertest.TestSystemConfigWatcher(t, true /* skipSecondary */) } + +func TestNewWithAdditionalProvider(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) + fakeTenant := roachpb.MakeTenantID(10) + codec := keys.MakeSQLCodec(fakeTenant) + fp := &fakeProvider{ + ch: make(chan struct{}, 1), + } + fp.setSystemConfig(config.NewSystemConfig(zonepb.DefaultZoneConfigRef())) + cache := systemconfigwatcher.NewWithAdditionalProvider( + codec, s.Clock(), s.RangeFeedFactory().(*rangefeed.Factory), + zonepb.DefaultZoneConfigRef(), fp, + ) + mkKV := func(key, value string) roachpb.KeyValue { + return roachpb.KeyValue{ + Key: []byte(key), + Value: func() (v roachpb.Value) { v.SetString(value); return v }(), + } + } + setAdditional := func(kvs ...roachpb.KeyValue) { + additional := config.SystemConfig{} + additional.Values = kvs + fp.setSystemConfig(&additional) + } + kvA := mkKV("a", "value") + setAdditional(kvA) + fp.ch <- struct{}{} + + ch, _ := cache.RegisterSystemConfigChannel() + require.NoError(t, cache.Start(ctx, s.Stopper())) + getValues := func() []roachpb.KeyValue { + return cache.GetSystemConfig().SystemConfigEntries.Values + } + + <-ch // we'll get notified upon initial scan, which should be empty + require.Equal(t, []roachpb.KeyValue{kvA}, getValues()) + + // Update the kv-pair and make sure it propagates + kvB := mkKV("b", "value") + setAdditional(kvB) + fp.ch <- struct{}{} + <-ch + require.Equal(t, []roachpb.KeyValue{kvB}, getValues()) + + mkTenantKey := func(key string) roachpb.Key { + return append(codec.TablePrefix(keys.ZonesTableID), key...) + } + // Write a value and make sure that it shows up. + tenantA := mkTenantKey("a") + require.NoError(t, kvDB.Put(ctx, tenantA, "value")) + <-ch + require.Len(t, getValues(), 2) + require.Equal(t, kvB, getValues()[0]) + require.Equal(t, tenantA, getValues()[1].Key) + + // Update the additional value. + kvC := mkKV("c", "value") + setAdditional(kvA, kvC) + fp.ch <- struct{}{} + <-ch + require.Len(t, getValues(), 3) + require.Equal(t, kvA, getValues()[0]) + require.Equal(t, kvC, getValues()[1]) + require.Equal(t, tenantA, getValues()[2].Key) +} + +type fakeProvider struct { + ch chan struct{} + mu struct { + syncutil.Mutex + cfg *config.SystemConfig + } +} + +func (f *fakeProvider) GetSystemConfig() *config.SystemConfig { + f.mu.Lock() + defer f.mu.Unlock() + return f.mu.cfg +} + +func (f *fakeProvider) setSystemConfig(cfg *config.SystemConfig) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.cfg = cfg +} + +func (f *fakeProvider) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) { + return f.ch, func() {} +} + +var _ config.SystemConfigProvider = (*fakeProvider)(nil) diff --git a/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel b/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel index d0f3dd4e3c68..14314352411a 100644 --- a/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel +++ b/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel @@ -7,7 +7,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/config", "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/kvtenant", "//pkg/roachpb", "//pkg/sql", "//pkg/testutils", diff --git a/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go index 93c721664aa3..2d90d7eb988e 100644 --- a/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go +++ b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go @@ -15,10 +15,14 @@ package systemconfigwatchertest import ( "context" gosql "database/sql" + "sort" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -40,7 +44,7 @@ func TestSystemConfigWatcher(t *testing.T, skipSecondary bool) { defer log.Scope(t).Close(t) ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) tdb := sqlutils.MakeSQLRunner(sqlDB) // Shorten the closed timestamp duration as a cheeky way to check the @@ -49,19 +53,32 @@ func TestSystemConfigWatcher(t *testing.T, skipSecondary bool) { tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") t.Run("system", func(t *testing.T) { - runTest(t, s, sqlDB) + runTest(t, s, sqlDB, nil) }) if !skipSecondary { t.Run("secondary", func(t *testing.T) { tenant, tenantDB := serverutils.StartTenant(t, s, base.TestTenantArgs{ TenantID: serverutils.TestTenantID(), }) - runTest(t, tenant, tenantDB) + // We expect the secondary tenant to see the host tenant's view of a few + // keys. We need to plumb that expectation into the test. + runTest(t, tenant, tenantDB, func(t *testing.T) []roachpb.KeyValue { + return kvtenant.GossipSubscriptionSystemConfigMask.Apply( + config.SystemConfigEntries{ + Values: getSystemConfig(ctx, t, keys.SystemSQLCodec, kvDB), + }, + ).Values + }) }) } } -func runTest(t *testing.T, s serverutils.TestTenantInterface, sqlDB *gosql.DB) { +func runTest( + t *testing.T, + s serverutils.TestTenantInterface, + sqlDB *gosql.DB, + extraRows func(t *testing.T) []roachpb.KeyValue, +) { ctx := context.Background() tdb := sqlutils.MakeSQLRunner(sqlDB) execCfg := s.ExecutorConfig().(sql.ExecutorConfig) @@ -74,23 +91,16 @@ func runTest(t *testing.T, s serverutils.TestTenantInterface, sqlDB *gosql.DB) { default: } } - getSystemConfig := func(t *testing.T) []roachpb.KeyValue { - var ba roachpb.BatchRequest - ba.Add(roachpb.NewScan( - append(execCfg.Codec.TenantPrefix(), keys.SystemConfigSpan.Key...), - append(execCfg.Codec.TenantPrefix(), keys.SystemConfigSpan.EndKey...), - false, // forUpdate - )) - br, pErr := kvDB.NonTransactionalSender().Send(ctx, ba) - require.NoError(t, pErr.GoError()) - return br.Responses[0].GetScan().Rows - } checkEqual := func(t *testing.T) error { rs := r.GetSystemConfig() if rs == nil { return errors.New("nil config") } - sc := getSystemConfig(t) + sc := getSystemConfig(ctx, t, execCfg.Codec, kvDB) + if extraRows != nil { + sc = append(sc, extraRows(t)...) + sort.Sort(roachpb.KeyValueByKey(sc)) + } if !assert.Equal(noopT{}, sc, rs.Values) { return errors.Errorf("mismatch: %v", pretty.Diff(sc, rs.Values)) } @@ -108,6 +118,20 @@ func runTest(t *testing.T, s serverutils.TestTenantInterface, sqlDB *gosql.DB) { waitForEqual(t) } +func getSystemConfig( + ctx context.Context, t *testing.T, codec keys.SQLCodec, kvDB *kv.DB, +) []roachpb.KeyValue { + var ba roachpb.BatchRequest + ba.Add(roachpb.NewScan( + append(codec.TenantPrefix(), keys.SystemConfigSpan.Key...), + append(codec.TenantPrefix(), keys.SystemConfigSpan.EndKey...), + false, // forUpdate + )) + br, pErr := kvDB.NonTransactionalSender().Send(ctx, ba) + require.NoError(t, pErr.GoError()) + return br.Responses[0].GetScan().Rows +} + type noopT struct{} func (noopT) Errorf(string, ...interface{}) {} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 582fcb136aef..00109bd589be 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -472,8 +472,9 @@ func makeTenantSQLServerArgs( return sqlServerArgs{}, err } - systemConfigWatcher := systemconfigwatcher.New( + systemConfigWatcher := systemconfigwatcher.NewWithAdditionalProvider( keys.MakeSQLCodec(sqlCfg.TenantID), clock, rangeFeedFactory, &baseCfg.DefaultZoneConfig, + tenantConnect, ) circularInternalExecutor := &sql.InternalExecutor{} diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index eabdeb4d3de5..c42630571c57 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -15,6 +15,7 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -100,8 +101,8 @@ func updateStatusForGCElements( if err != nil { return err } - - zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, uint32(tableID)) + v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx) + zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, v, config.ObjectID(tableID)) if err != nil { log.Errorf(ctx, "zone config for desc: %d, err = %+v", tableID, err) return nil @@ -454,7 +455,8 @@ func refreshTenant( tenID := details.Tenant.ID cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds - zoneCfg, err := cfg.GetZoneConfigForObject(keys.MakeSQLCodec(roachpb.MakeTenantID(tenID)), 0) + v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx) + zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, v, keys.TenantsRangesID) if err == nil { tenantTTLSeconds = zoneCfg.GC.TTLSeconds } else { diff --git a/pkg/sql/importer/exportcsv_test.go b/pkg/sql/importer/exportcsv_test.go index 568e60cd8fda..b433bd5c51bf 100644 --- a/pkg/sql/importer/exportcsv_test.go +++ b/pkg/sql/importer/exportcsv_test.go @@ -70,7 +70,7 @@ func setupExportableBank(t *testing.T, nodes, rows int) (*sqlutils.SQLRunner, st if err != nil { t.Fatal(err) } - last := config.SystemTenantObjectID(v.ValueInt()) + last := config.ObjectID(v.ValueInt()) zoneConfig := zonepb.DefaultZoneConfig() zoneConfig.RangeMaxBytes = proto.Int64(5000) config.TestingSetZoneConfig(last+1, zoneConfig) diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 2443c3dac340..dd4d28832719 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -15,6 +15,7 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/keys" @@ -467,7 +468,9 @@ func (oc *optCatalog) getZoneConfig(desc catalog.TableDescriptor) (cat.Zone, err if oc.cfg == nil || desc.IsVirtualTable() { return emptyZoneConfig, nil } - zone, err := oc.cfg.GetZoneConfigForObject(oc.codec(), uint32(desc.GetID())) + zone, err := oc.cfg.GetZoneConfigForObject( + oc.codec(), oc.version(), config.ObjectID(desc.GetID()), + ) if err != nil { return nil, err } @@ -482,6 +485,12 @@ func (oc *optCatalog) codec() keys.SQLCodec { return oc.planner.ExecCfg().Codec } +func (oc *optCatalog) version() clusterversion.ClusterVersion { + return oc.planner.ExecCfg().Settings.Version.ActiveVersionOrEmpty( + oc.planner.EvalContext().Context, + ) +} + // optView is a wrapper around catalog.TableDescriptor that implements // the cat.Object, cat.DataSource, and cat.View interfaces. type optView struct { diff --git a/pkg/sql/zone_config.go b/pkg/sql/zone_config.go index 16d75373443d..ede3fe31c893 100644 --- a/pkg/sql/zone_config.go +++ b/pkg/sql/zone_config.go @@ -185,20 +185,20 @@ func completeZoneConfig( // an object ID. It does not make any external KV calls to look up additional // state. func zoneConfigHook( - cfg *config.SystemConfig, id config.SystemTenantObjectID, + cfg *config.SystemConfig, codec keys.SQLCodec, id config.ObjectID, ) (*zonepb.ZoneConfig, *zonepb.ZoneConfig, bool, error) { getKey := func(key roachpb.Key) (*roachpb.Value, error) { return cfg.GetValue(key), nil } const mayBeTable = true zoneID, zone, _, placeholder, err := getZoneConfig( - keys.SystemSQLCodec, descpb.ID(id), getKey, false /* getInheritedDefault */, mayBeTable) + codec, descpb.ID(id), getKey, false /* getInheritedDefault */, mayBeTable) if errors.Is(err, errNoZoneConfigApplies) { return nil, nil, true, nil } else if err != nil { return nil, nil, false, err } - if err = completeZoneConfig(zone, keys.SystemSQLCodec, zoneID, getKey); err != nil { + if err = completeZoneConfig(zone, codec, zoneID, getKey); err != nil { return nil, nil, false, err } return zone, placeholder, true, nil diff --git a/pkg/sql/zone_config_test.go b/pkg/sql/zone_config_test.go index 8129a3a4519e..183e800911bb 100644 --- a/pkg/sql/zone_config_test.go +++ b/pkg/sql/zone_config_test.go @@ -125,7 +125,7 @@ func TestGetZoneConfig(t *testing.T) { // Verify SystemConfig.GetZoneConfigForKey. { key := append(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(tc.objectID)), tc.keySuffix...) - _, zoneCfg, err := cfg.GetZoneConfigForKey(key) // Complete ZoneConfig + _, zoneCfg, err := config.TestingGetSystemTenantZoneConfigForKey(cfg, key) // Complete ZoneConfig if err != nil { t.Fatalf("#%d: err=%s", tcNum, err) } @@ -361,7 +361,7 @@ func TestCascadingZoneConfig(t *testing.T) { // Verify SystemConfig.GetZoneConfigForKey. { key := append(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(tc.objectID)), tc.keySuffix...) - _, zoneCfg, err := cfg.GetZoneConfigForKey(key) // Complete ZoneConfig + _, zoneCfg, err := config.TestingGetSystemTenantZoneConfigForKey(cfg, key) // Complete ZoneConfig if err != nil { t.Fatalf("#%d: err=%s", tcNum, err) } @@ -660,7 +660,7 @@ func BenchmarkGetZoneConfig(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { key := roachpb.RKey(keys.SystemSQLCodec.TablePrefix(bootstrap.TestingUserDescID(0))) - _, _, err := cfg.GetZoneConfigForKey(key) + _, _, err := config.TestingGetSystemTenantZoneConfigForKey(cfg, key) if err != nil { b.Fatal(err) }