From f7fb96eea70a90accfd40d5c2847aea7adef202d Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 14 Nov 2022 17:35:30 -0500 Subject: [PATCH] sqlliveness: embed current region into session ID Follow up on #90408 and #91019. Previously, the sqlliveness subsystem was using enum.One when constructing the session ID since it doesn't have the regional data yet. This commit implements that missing part of it by ensuring that the regional representation is plumbed all the way to the sqlliveness subsystem whenever the SQL pod is started, and use it to construct the session ID. This will enable our REGIONAL BY ROW work to put the data in the right region. The multi-region enum for the system database will be read during startup for that to work. Since doing this already requires loading the system DB's descriptor (which indirectly tests whether the system DB has been bootstrapped) for the tenant, we can remove the call to checkTenantExists. Epic: None Release note: None --- pkg/ccl/multiregionccl/BUILD.bazel | 3 + .../multiregion_system_table_test.go | 108 +++++++++++- pkg/ccl/multiregionccl/region_util_test.go | 108 ++++++++++++ pkg/ccl/serverccl/server_sql_test.go | 3 +- pkg/server/server_sql.go | 42 ++--- pkg/sql/region_util.go | 161 ++++++++++++++---- pkg/sql/sqlliveness/slinstance/BUILD.bazel | 1 + pkg/sql/sqlliveness/slinstance/slinstance.go | 13 +- .../sqlliveness/slinstance/slinstance_test.go | 41 ++++- pkg/sql/sqlliveness/slprovider/slprovider.go | 4 +- pkg/sql/sqlliveness/sqlliveness.go | 10 +- 11 files changed, 407 insertions(+), 87 deletions(-) create mode 100644 pkg/ccl/multiregionccl/region_util_test.go diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index a93ef2546d9a..f96f5a5b5ffe 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -31,6 +31,7 @@ go_test( "multiregion_system_table_test.go", "multiregion_test.go", "region_test.go", + "region_util_test.go", "regional_by_row_system_database_test.go", "regional_by_row_test.go", "roundtrips_test.go", @@ -68,11 +69,13 @@ go_test( "//pkg/sql/catalog/desctestutils", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", + "//pkg/sql/enum", "//pkg/sql/execinfra", "//pkg/sql/parser", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", "//pkg/sql/tests", "//pkg/sql/types", diff --git a/pkg/ccl/multiregionccl/multiregion_system_table_test.go b/pkg/ccl/multiregionccl/multiregion_system_table_test.go index 59c2949664da..037161d70872 100644 --- a/pkg/ccl/multiregionccl/multiregion_system_table_test.go +++ b/pkg/ccl/multiregionccl/multiregion_system_table_test.go @@ -11,6 +11,7 @@ package multiregionccl import ( "context" gosql "database/sql" + "fmt" "testing" "github.com/cockroachdb/apd/v3" @@ -26,9 +27,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -176,10 +180,10 @@ func TestMrSystemDatabase(t *testing.T) { t.Run("InUse", func(t *testing.T) { query := ` - SELECT id, addr, session_id, locality, crdb_region - FROM system.sql_instances - WHERE session_id IS NOT NULL - ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NOT NULL + ` rows := tDB.Query(t, query) require.True(t, rows.Next()) for { @@ -207,10 +211,10 @@ func TestMrSystemDatabase(t *testing.T) { t.Run("Preallocated", func(t *testing.T) { query := ` - SELECT id, addr, session_id, locality, crdb_region - FROM system.sql_instances - WHERE session_id IS NULL - ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NULL + ` rows := tDB.Query(t, query) require.True(t, rows.Next()) for { @@ -234,3 +238,91 @@ func TestMrSystemDatabase(t *testing.T) { }) }) } + +func TestTenantStartupWithMultiRegionEnum(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + + tc, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 3 /*numServers*/, base.TestingKnobs{}, + ) + defer cleanup() + sqlDB := sqlutils.MakeSQLRunner(db) + + tenID := roachpb.MustMakeTenantID(10) + ten, tSQL := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{ + TenantID: tenID, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east1"}, + }, + }, + }) + defer tSQL.Close() + tenSQLDB := sqlutils.MakeSQLRunner(tSQL) + + // Set cluster setting override on the host, and wait for it to propagate. + sqlDB.Exec(t, fmt.Sprintf("ALTER TENANT $1 SET CLUSTER SETTING %s = 'true'", + sql.SecondaryTenantsMultiRegionAbstractionsEnabledSettingName), tenID.ToUint64()) + testutils.SucceedsSoon(t, func() error { + var currentVal string + tenSQLDB.QueryRow(t, fmt.Sprintf("SHOW CLUSTER SETTING %s", + sql.SecondaryTenantsMultiRegionAbstractionsEnabledSettingName)).Scan(¤tVal) + if currentVal != "true" { + return errors.New("waiting for cluster setting to be set to true") + } + return nil + }) + + // Update system database with regions. + tenSQLDB.Exec(t, `SET descriptor_validation = read_only`) + tenSQLDB.Exec(t, `ALTER DATABASE system SET PRIMARY REGION "us-east1"`) + tenSQLDB.Exec(t, `SET descriptor_validation = on`) + tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`) + tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`) + + ten2, tSQL2 := serverutils.StartTenant(t, tc.Server(2), base.TestTenantArgs{ + TenantID: tenID, + Existing: true, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east3"}, + }, + }, + }) + defer tSQL2.Close() + tenSQLDB2 := sqlutils.MakeSQLRunner(tSQL2) + + // The sqlliveness entry created by the first SQL server has enum.One as the + // region as the system database hasn't been updated when it first started. + var sessionID string + tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`, + ten.SQLInstanceID()).Scan(&sessionID) + region, id, err := slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID)) + require.NoError(t, err) + require.NotNil(t, id) + require.Equal(t, enum.One, region) + + // Ensure that the sqlliveness entry created by the second SQL server has + // the right region and session UUID. + tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`, + ten2.SQLInstanceID()).Scan(&sessionID) + region, id, err = slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID)) + require.NoError(t, err) + require.NotNil(t, id) + require.NotEqual(t, enum.One, region) + + rows := tenSQLDB2.Query(t, `SELECT crdb_region, session_uuid FROM system.sqlliveness`) + defer rows.Close() + livenessMap := map[string][]byte{} + for rows.Next() { + var region, sessionUUID string + require.NoError(t, rows.Scan(®ion, &sessionUUID)) + livenessMap[sessionUUID] = []byte(region) + } + require.NoError(t, rows.Err()) + r, ok := livenessMap[string(id)] + require.True(t, ok) + require.Equal(t, r, region) +} diff --git a/pkg/ccl/multiregionccl/region_util_test.go b/pkg/ccl/multiregionccl/region_util_test.go new file mode 100644 index 000000000000..ac1ac9780997 --- /dev/null +++ b/pkg/ccl/multiregionccl/region_util_test.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package multiregionccl_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestGetLocalityRegionEnumPhysicalRepresentation is in the ccl package since +// it utilizes adding regions to a database. +func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 3 /* numServers */, base.TestingKnobs{}) + defer cleanup() + + tDB := sqlutils.MakeSQLRunner(sqlDB) + tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`) + + s0 := tc.ServerTyped(0) + ief := s0.InternalExecutorFactory().(descs.TxnManager) + dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo") + + t.Run("with locality that exists", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}}, + }, + ) + require.NoError(t, err) + + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east2"], regionEnum) + }) + + t.Run("with non-existent locality", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}}, + }, + ) + require.NoError(t, err) + + // Fallback to primary region if the locality is provided, but non-existent. + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east1"], regionEnum) + }) + + t.Run("without locality", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{}) + require.NoError(t, err) + + // Fallback to primary region is locality information is missing. + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east1"], regionEnum) + }) +} + +func getEnumMembers( + t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID, +) map[string][]byte { + t.Helper() + enumMembers := make(map[string][]byte) + err := sql.TestingDescsTxn(ctx, ts, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { + _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, + tree.DatabaseLookupFlags{Required: true}) + require.NoError(t, err) + regionEnumID, err := dbDesc.MultiRegionEnumID() + require.NoError(t, err) + regionEnumDesc, err := descsCol.GetImmutableTypeByID(ctx, txn, regionEnumID, + tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{Required: true}}) + require.NoError(t, err) + for ord := 0; ord < regionEnumDesc.NumEnumMembers(); ord++ { + enumMembers[regionEnumDesc.GetMemberLogicalRepresentation(ord)] = regionEnumDesc.GetMemberPhysicalRepresentation(ord) + } + return nil + }) + require.NoError(t, err) + return enumMembers +} diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index e6ef77f40383..5219f2ce1acd 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -198,8 +198,7 @@ func TestNonExistentTenant(t *testing.T) { DisableCreateTenant: true, SkipTenantCheck: true, }) - require.Error(t, err) - require.Equal(t, "system DB uninitialized, check if tenant is non existent", err.Error()) + require.EqualError(t, err, `database "[1]" does not exist`) } // TestTenantRowIDs confirms `unique_rowid()` works as expected in a diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 2d7f9f9e22ef..dadc6cd20b15 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -71,7 +71,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/consistencychecker" "github.com/cockroachdb/cockroach/pkg/sql/contention" @@ -1256,28 +1255,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { }, nil } -// Checks if tenant exists. This function does a very superficial check to see if the system db -// has been bootstrapped for the tenant. This is not a complete check and is only sufficient -// to be used in the dev environment. -func checkTenantExists(ctx context.Context, codec keys.SQLCodec, db *kv.DB) error { - if codec.ForSystemTenant() { - return errors.AssertionFailedf("asked to check for tenant but system codec specified") - } - - key := catalogkeys.MakeDatabaseNameKey(codec, systemschema.SystemDatabaseName) - result, err := db.Get(ctx, key) - if err != nil { - return err - } - if result.Value == nil || result.ValueInt() != keys.SystemDatabaseID { - return errors.New("system DB uninitialized, check if tenant is non existent") - } - // Tenant has been confirmed to be bootstrapped successfully - // as the system database, which is a part of the bootstrap data for - // a tenant keyspace, exists in the namespace table. - return nil -} - func (s *SQLServer) setInstanceID( ctx context.Context, instanceID base.SQLInstanceID, sessionID sqlliveness.SessionID, ) error { @@ -1297,7 +1274,6 @@ func (s *SQLServer) preStart( pgL net.Listener, orphanedLeasesTimeThresholdNanos int64, ) error { - // If necessary, start the tenant proxy first, to ensure all other // components can properly route to KV nodes. The Start method will block // until a connection is established to the cluster and its ID has been @@ -1306,17 +1282,19 @@ func (s *SQLServer) preStart( if err := s.tenantConnect.Start(ctx); err != nil { return err } - // Confirm tenant exists prior to initialization. This is a sanity - // check for the dev environment to ensure that a tenant has been - // successfully created before attempting to initialize a SQL - // server for it. - if err := checkTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { - return err - } + } + + // Load the multi-region enum by reading the system database's descriptor. + // This also serves as a simple check to see if a tenant exist (i.e. by + // checking whether the system db has been bootstrapped). + regionPhysicalRep, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, s.internalExecutorFactory, s.execCfg.DB, keys.SystemDatabaseID, s.distSQLServer.Locality) + if err != nil && !errors.Is(err, sql.ErrNotMultiRegionDatabase) { + return err } // Start the sql liveness subsystem. We'll need it to get a session. - s.sqlLivenessProvider.Start(ctx) + s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() isTenant := !isMixedSQLAndKVNode diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index 7cc3248dd736..02c62c028337 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -1410,9 +1411,9 @@ var SynthesizeRegionConfigOptionUseCache SynthesizeRegionConfigOption = func(o * o.useCache = true } -// errNotMultiRegionDatabase is returned from SynthesizeRegionConfig when the +// ErrNotMultiRegionDatabase is returned from SynthesizeRegionConfig when the // requested database is not a multi-region database. -var errNotMultiRegionDatabase = errors.New( +var ErrNotMultiRegionDatabase = errors.New( "database is not a multi-region database", ) @@ -1435,60 +1436,41 @@ func SynthesizeRegionConfig( opt(&o) } - regionConfig := multiregion.RegionConfig{} - _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{ - AvoidLeased: !o.useCache, - Required: true, - IncludeOffline: o.includeOffline, - }) - if err != nil { - return multiregion.RegionConfig{}, err - } - if !dbDesc.IsMultiRegion() { - return multiregion.RegionConfig{}, errNotMultiRegionDatabase - } - - regionEnumID, err := dbDesc.MultiRegionEnumID() - if err != nil { - return regionConfig, err - } - - regionEnum, err := descsCol.GetImmutableTypeByID( - ctx, - txn, - regionEnumID, - tree.ObjectLookupFlags{ - CommonLookupFlags: tree.CommonLookupFlags{ - AvoidLeased: !o.useCache, - IncludeOffline: o.includeOffline, - }, - }, + dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs( + ctx, txn, dbID, descsCol, o.useCache, o.includeOffline, ) if err != nil { return multiregion.RegionConfig{}, err } + regionConfig := multiregion.RegionConfig{} + var regionNames catpb.RegionNames if o.forValidation { - regionNames, err = regionEnum.RegionNamesForValidation() + regionNames, err = regionEnumDesc.RegionNamesForValidation() } else { - regionNames, err = regionEnum.RegionNames() + regionNames, err = regionEnumDesc.RegionNames() } if err != nil { return regionConfig, err } - zoneCfgExtensions, err := regionEnum.ZoneConfigExtensions() + zoneCfgExtensions, err := regionEnumDesc.ZoneConfigExtensions() + if err != nil { + return regionConfig, err + } + + transitioningRegionNames, err := regionEnumDesc.TransitioningRegionNames() if err != nil { return regionConfig, err } - transitioningRegionNames, err := regionEnum.TransitioningRegionNames() + superRegions, err := regionEnumDesc.SuperRegions() if err != nil { return regionConfig, err } - superRegions, err := regionEnum.SuperRegions() + regionEnumID, err := dbDesc.MultiRegionEnumID() if err != nil { return regionConfig, err } @@ -1512,6 +1494,115 @@ func SynthesizeRegionConfig( return regionConfig, nil } +// GetLocalityRegionEnumPhysicalRepresentation returns the physical +// representation of the given locality stored in the multi-region enum type +// associated with dbID. If the given locality isn't found, the physical +// representation of the primary region in dbID will be returned instead. +// This returns an ErrNotMultiRegionDatabase error if the database isn't +// multi-region. +func GetLocalityRegionEnumPhysicalRepresentation( + ctx context.Context, + internalExecutorFactory descs.TxnManager, + kvDB *kv.DB, + dbID descpb.ID, + locality roachpb.Locality, +) ([]byte, error) { + var enumReps map[catpb.RegionName][]byte + var primaryRegion catpb.RegionName + if err := internalExecutorFactory.DescsTxn(ctx, kvDB, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + enumReps, primaryRegion = nil, "" // reset for retry + var err error + enumReps, primaryRegion, err = getRegionEnumRepresentations(ctx, txn, dbID, descsCol) + return err + }); err != nil { + return nil, err + } + + // The primary region will be used if no region was provided through the + // locality flag. + currentRegion, _ := locality.Find("region") + if enumValue, ok := enumReps[catpb.RegionName(currentRegion)]; ok { + return enumValue, nil + } + if enumValue, ok := enumReps[primaryRegion]; ok { + return enumValue, nil + } + // This shouldn't be the case since if a primary region is defined for the + // database, there should exist a corresponding enum member value. + return nil, errors.AssertionFailedf("primary region not found") +} + +// getRegionEnumRepresentations returns representations stored in the +// multi-region enum type associated with dbID, and the primary region of it. +// An ErrNotMultiRegionDatabase error will be returned if the database isn't +// multi-region. +func getRegionEnumRepresentations( + ctx context.Context, txn *kv.Txn, dbID descpb.ID, descsCol *descs.Collection, +) (enumReps map[catpb.RegionName][]byte, primaryRegion catpb.RegionName, err error) { + dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs( + ctx, txn, dbID, descsCol, false /* useCache */, false /* includeOffline */) + if err != nil { + return nil, "", err + } + + enumReps = make(map[catpb.RegionName][]byte) + for ord := 0; ord < regionEnumDesc.NumEnumMembers(); ord++ { + if regionEnumDesc.IsMemberReadOnly(ord) { + continue + } + enumReps[catpb.RegionName( + regionEnumDesc.GetMemberLogicalRepresentation(ord), + )] = regionEnumDesc.GetMemberPhysicalRepresentation(ord) + } + return enumReps, dbDesc.GetRegionConfig().PrimaryRegion, nil +} + +// getDBAndRegionEnumDescs returns descriptors for both the database and +// multi-region enum type. If the database isn't multi-region, an +// ErrNotMultiRegionDatabase error will be returned. +func getDBAndRegionEnumDescs( + ctx context.Context, + txn *kv.Txn, + dbID descpb.ID, + descsCol *descs.Collection, + useCache bool, + includeOffline bool, +) (dbDesc catalog.DatabaseDescriptor, regionEnumDesc catalog.TypeDescriptor, _ error) { + _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{ + AvoidLeased: !useCache, + Required: true, + IncludeOffline: includeOffline, + }) + if err != nil { + return nil, nil, err + } + if !dbDesc.IsMultiRegion() { + return nil, nil, ErrNotMultiRegionDatabase + } + regionEnumID, err := dbDesc.MultiRegionEnumID() + if err != nil { + return nil, nil, err + } + regionEnumDesc, err = descsCol.GetImmutableTypeByID( + ctx, + txn, + regionEnumID, + tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidLeased: !useCache, + Required: true, + IncludeOffline: includeOffline, + }, + }, + ) + if err != nil { + return nil, nil, err + } + return dbDesc, regionEnumDesc, nil +} + // blockDiscardOfZoneConfigForMultiRegionObject determines if discarding the // zone configuration of a multi-region table, index or partition should be // blocked. We only block the discard if the multi-region abstractions have diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 87598d59c740..f8ac6aa11b97 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -34,6 +34,7 @@ go_test( ":slinstance", "//pkg/clusterversion", "//pkg/settings/cluster", + "//pkg/sql/enum", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", "//pkg/util/hlc", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 9c0502c0ebe3..16be8bbae875 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -121,6 +121,7 @@ type Instance struct { // sessionEvents gets notified of some session state changes. sessionEvents SessionEventListener storage Writer + currentRegion []byte ttl func() time.Duration hb func() time.Duration testKnobs sqlliveness.TestingKnobs @@ -197,7 +198,11 @@ func (l *Instance) clearSessionLocked(ctx context.Context) (createNewSession boo // createSession tries until it can create a new session and returns an error // only if the heart beat loop should exit. func (l *Instance) createSession(ctx context.Context) (*session, error) { - id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + region := enum.One + if l.currentRegion != nil { + region = l.currentRegion + } + id, err := slstorage.MakeSessionID(region, uuid.MakeV4()) if err != nil { return nil, err } @@ -396,9 +401,9 @@ func NewSQLInstance( } // Start runs the hearbeat loop. -func (l *Instance) Start(ctx context.Context) { - l.mu.Lock() - defer l.mu.Unlock() +func (l *Instance) Start(ctx context.Context, regionPhysicalRep []byte) { + l.currentRegion = regionPhysicalRep + log.Infof(ctx, "starting SQL liveness instance") // Detach from ctx's cancelation. taskCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index d7941f7f3d04..3dd1754b9f32 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" @@ -45,11 +46,11 @@ func TestSQLInstance(t *testing.T) { fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) - sqlInstance.Start(ctx) + sqlInstance.Start(ctx, nil) // Add one more instance to introduce concurrent access to storage. dummy := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) - dummy.Start(ctx) + dummy.Start(ctx, nil) s1, err := sqlInstance.Session(ctx) require.NoError(t, err) @@ -57,6 +58,11 @@ func TestSQLInstance(t *testing.T) { require.NoError(t, err) require.True(t, a) + region, id, err := slstorage.UnsafeDecodeSessionID(s1.ID()) + require.NoError(t, err) + require.Equal(t, enum.One, region) + require.NotNil(t, id) + s2, err := sqlInstance.Session(ctx) require.NoError(t, err) require.Equal(t, s1.ID(), s2.ID()) @@ -91,3 +97,34 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } + +func TestSQLInstanceWithRegion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) + sqlInstance.Start(ctx, []byte{42}) + + s1, err := sqlInstance.Session(ctx) + require.NoError(t, err) + a, err := fakeStorage.IsAlive(ctx, s1.ID()) + require.NoError(t, err) + require.True(t, a) + + region, id, err := slstorage.UnsafeDecodeSessionID(s1.ID()) + require.NoError(t, err) + require.Equal(t, []byte{42}, region) + require.NotNil(t, id) +} diff --git a/pkg/sql/sqlliveness/slprovider/slprovider.go b/pkg/sql/sqlliveness/slprovider/slprovider.go index fca26ceae100..9e593bb780ec 100644 --- a/pkg/sql/sqlliveness/slprovider/slprovider.go +++ b/pkg/sql/sqlliveness/slprovider/slprovider.go @@ -55,9 +55,9 @@ type provider struct { var _ sqlliveness.Provider = &provider{} -func (p *provider) Start(ctx context.Context) { +func (p *provider) Start(ctx context.Context, regionPhysicalRep []byte) { p.Storage.Start(ctx) - p.Instance.Start(ctx) + p.Instance.Start(ctx, regionPhysicalRep) } func (p *provider) Metrics() metric.Struct { diff --git a/pkg/sql/sqlliveness/sqlliveness.go b/pkg/sql/sqlliveness/sqlliveness.go index 091b69d14710..6510277f5dc1 100644 --- a/pkg/sql/sqlliveness/sqlliveness.go +++ b/pkg/sql/sqlliveness/sqlliveness.go @@ -34,10 +34,16 @@ type SessionID string // Provider is a wrapper around the sqlliveness subsystem for external // consumption. type Provider interface { - Start(ctx context.Context) - Metrics() metric.Struct Liveness + // Start starts the sqlliveness subsystem. regionPhysicalRep should + // represent the physical representation of the current process region + // stored in the multi-region enum type associated with the system database. + Start(ctx context.Context, regionPhysicalRep []byte) + + // Metrics returns a metric.Struct which holds metrics for the provider. + Metrics() metric.Struct + // CachedReader returns a reader which only consults its local cache and // does not perform any RPCs in the IsAlive call. CachedReader() Reader