diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 2555a583e4c4..76826b5dd955 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -31,6 +31,7 @@ go_test( "multiregion_system_table_test.go", "multiregion_test.go", "region_test.go", + "region_util_test.go", "regional_by_row_system_database_test.go", "regional_by_row_test.go", "roundtrips_test.go", @@ -67,11 +68,13 @@ go_test( "//pkg/sql/catalog/desctestutils", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", + "//pkg/sql/enum", "//pkg/sql/execinfra", "//pkg/sql/parser", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", "//pkg/sql/tests", "//pkg/sql/types", diff --git a/pkg/ccl/multiregionccl/multiregion_system_table_test.go b/pkg/ccl/multiregionccl/multiregion_system_table_test.go index 59c2949664da..4a7897589e60 100644 --- a/pkg/ccl/multiregionccl/multiregion_system_table_test.go +++ b/pkg/ccl/multiregionccl/multiregion_system_table_test.go @@ -26,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -176,10 +178,10 @@ func TestMrSystemDatabase(t *testing.T) { t.Run("InUse", func(t *testing.T) { query := ` - SELECT id, addr, session_id, locality, crdb_region - FROM system.sql_instances - WHERE session_id IS NOT NULL - ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NOT NULL + ` rows := tDB.Query(t, query) require.True(t, rows.Next()) for { @@ -207,10 +209,10 @@ func TestMrSystemDatabase(t *testing.T) { t.Run("Preallocated", func(t *testing.T) { query := ` - SELECT id, addr, session_id, locality, crdb_region - FROM system.sql_instances - WHERE session_id IS NULL - ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NULL + ` rows := tDB.Query(t, query) require.True(t, rows.Next()) for { @@ -234,3 +236,82 @@ func TestMrSystemDatabase(t *testing.T) { }) }) } + +func TestTenantStartupWithMultiRegionEnum(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + + // Enable settings required for configuring a tenant's system database as multi-region. + cs := cluster.MakeTestingClusterSettings() + sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true) + sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true) + + tc, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 3 /*numServers*/, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs), + ) + defer cleanup() + + tenID := roachpb.MustMakeTenantID(10) + ten, tSQL := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{ + Settings: cs, + TenantID: tenID, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east1"}, + }, + }, + }) + defer tSQL.Close() + tenSQLDB := sqlutils.MakeSQLRunner(tSQL) + + // Update system database with regions. + tenSQLDB.Exec(t, `ALTER DATABASE system SET PRIMARY REGION "us-east1"`) + tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`) + tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`) + + ten2, tSQL2 := serverutils.StartTenant(t, tc.Server(2), base.TestTenantArgs{ + Settings: cs, + TenantID: tenID, + Existing: true, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east3"}, + }, + }, + }) + defer tSQL2.Close() + tenSQLDB2 := sqlutils.MakeSQLRunner(tSQL2) + + // The sqlliveness entry created by the first SQL server has enum.One as the + // region as the system database hasn't been updated when it first started. + var sessionID string + tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`, + ten.SQLInstanceID()).Scan(&sessionID) + region, id, err := slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID)) + require.NoError(t, err) + require.NotNil(t, id) + require.Equal(t, enum.One, region) + + // Ensure that the sqlliveness entry created by the second SQL server has + // the right region and session UUID. + tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`, + ten2.SQLInstanceID()).Scan(&sessionID) + region, id, err = slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID)) + require.NoError(t, err) + require.NotNil(t, id) + require.NotEqual(t, enum.One, region) + + rows := tenSQLDB2.Query(t, `SELECT crdb_region, session_uuid FROM system.sqlliveness`) + defer rows.Close() + livenessMap := map[string][]byte{} + for rows.Next() { + var region, sessionUUID string + require.NoError(t, rows.Scan(®ion, &sessionUUID)) + livenessMap[sessionUUID] = []byte(region) + } + require.NoError(t, rows.Err()) + r, ok := livenessMap[string(id)] + require.True(t, ok) + require.Equal(t, r, region) +} diff --git a/pkg/ccl/multiregionccl/region_util_test.go b/pkg/ccl/multiregionccl/region_util_test.go new file mode 100644 index 000000000000..ac1ac9780997 --- /dev/null +++ b/pkg/ccl/multiregionccl/region_util_test.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package multiregionccl_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestGetLocalityRegionEnumPhysicalRepresentation is in the ccl package since +// it utilizes adding regions to a database. +func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 3 /* numServers */, base.TestingKnobs{}) + defer cleanup() + + tDB := sqlutils.MakeSQLRunner(sqlDB) + tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`) + + s0 := tc.ServerTyped(0) + ief := s0.InternalExecutorFactory().(descs.TxnManager) + dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo") + + t.Run("with locality that exists", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}}, + }, + ) + require.NoError(t, err) + + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east2"], regionEnum) + }) + + t.Run("with non-existent locality", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}}, + }, + ) + require.NoError(t, err) + + // Fallback to primary region if the locality is provided, but non-existent. + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east1"], regionEnum) + }) + + t.Run("without locality", func(t *testing.T) { + regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{}) + require.NoError(t, err) + + // Fallback to primary region is locality information is missing. + enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + require.NotEmpty(t, enumMembers) + require.Equal(t, enumMembers["us-east1"], regionEnum) + }) +} + +func getEnumMembers( + t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID, +) map[string][]byte { + t.Helper() + enumMembers := make(map[string][]byte) + err := sql.TestingDescsTxn(ctx, ts, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { + _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, + tree.DatabaseLookupFlags{Required: true}) + require.NoError(t, err) + regionEnumID, err := dbDesc.MultiRegionEnumID() + require.NoError(t, err) + regionEnumDesc, err := descsCol.GetImmutableTypeByID(ctx, txn, regionEnumID, + tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{Required: true}}) + require.NoError(t, err) + for ord := 0; ord < regionEnumDesc.NumEnumMembers(); ord++ { + enumMembers[regionEnumDesc.GetMemberLogicalRepresentation(ord)] = regionEnumDesc.GetMemberPhysicalRepresentation(ord) + } + return nil + }) + require.NoError(t, err) + return enumMembers +} diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index db515e253657..94899b095442 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -198,8 +198,7 @@ func TestNonExistentTenant(t *testing.T) { DisableCreateTenant: true, SkipTenantCheck: true, }) - require.Error(t, err) - require.Equal(t, "system DB uninitialized, check if tenant is non existent", err.Error()) + require.EqualError(t, err, `database "[1]" does not exist`) } // TestTenantRowIDs confirms `unique_rowid()` works as expected in a diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 2d7f9f9e22ef..dadc6cd20b15 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -71,7 +71,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/consistencychecker" "github.com/cockroachdb/cockroach/pkg/sql/contention" @@ -1256,28 +1255,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { }, nil } -// Checks if tenant exists. This function does a very superficial check to see if the system db -// has been bootstrapped for the tenant. This is not a complete check and is only sufficient -// to be used in the dev environment. -func checkTenantExists(ctx context.Context, codec keys.SQLCodec, db *kv.DB) error { - if codec.ForSystemTenant() { - return errors.AssertionFailedf("asked to check for tenant but system codec specified") - } - - key := catalogkeys.MakeDatabaseNameKey(codec, systemschema.SystemDatabaseName) - result, err := db.Get(ctx, key) - if err != nil { - return err - } - if result.Value == nil || result.ValueInt() != keys.SystemDatabaseID { - return errors.New("system DB uninitialized, check if tenant is non existent") - } - // Tenant has been confirmed to be bootstrapped successfully - // as the system database, which is a part of the bootstrap data for - // a tenant keyspace, exists in the namespace table. - return nil -} - func (s *SQLServer) setInstanceID( ctx context.Context, instanceID base.SQLInstanceID, sessionID sqlliveness.SessionID, ) error { @@ -1297,7 +1274,6 @@ func (s *SQLServer) preStart( pgL net.Listener, orphanedLeasesTimeThresholdNanos int64, ) error { - // If necessary, start the tenant proxy first, to ensure all other // components can properly route to KV nodes. The Start method will block // until a connection is established to the cluster and its ID has been @@ -1306,17 +1282,19 @@ func (s *SQLServer) preStart( if err := s.tenantConnect.Start(ctx); err != nil { return err } - // Confirm tenant exists prior to initialization. This is a sanity - // check for the dev environment to ensure that a tenant has been - // successfully created before attempting to initialize a SQL - // server for it. - if err := checkTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { - return err - } + } + + // Load the multi-region enum by reading the system database's descriptor. + // This also serves as a simple check to see if a tenant exist (i.e. by + // checking whether the system db has been bootstrapped). + regionPhysicalRep, err := sql.GetLocalityRegionEnumPhysicalRepresentation( + ctx, s.internalExecutorFactory, s.execCfg.DB, keys.SystemDatabaseID, s.distSQLServer.Locality) + if err != nil && !errors.Is(err, sql.ErrNotMultiRegionDatabase) { + return err } // Start the sql liveness subsystem. We'll need it to get a session. - s.sqlLivenessProvider.Start(ctx) + s.sqlLivenessProvider.Start(ctx, regionPhysicalRep) _, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID() isTenant := !isMixedSQLAndKVNode diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index 7cc3248dd736..02c62c028337 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -1410,9 +1411,9 @@ var SynthesizeRegionConfigOptionUseCache SynthesizeRegionConfigOption = func(o * o.useCache = true } -// errNotMultiRegionDatabase is returned from SynthesizeRegionConfig when the +// ErrNotMultiRegionDatabase is returned from SynthesizeRegionConfig when the // requested database is not a multi-region database. -var errNotMultiRegionDatabase = errors.New( +var ErrNotMultiRegionDatabase = errors.New( "database is not a multi-region database", ) @@ -1435,60 +1436,41 @@ func SynthesizeRegionConfig( opt(&o) } - regionConfig := multiregion.RegionConfig{} - _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{ - AvoidLeased: !o.useCache, - Required: true, - IncludeOffline: o.includeOffline, - }) - if err != nil { - return multiregion.RegionConfig{}, err - } - if !dbDesc.IsMultiRegion() { - return multiregion.RegionConfig{}, errNotMultiRegionDatabase - } - - regionEnumID, err := dbDesc.MultiRegionEnumID() - if err != nil { - return regionConfig, err - } - - regionEnum, err := descsCol.GetImmutableTypeByID( - ctx, - txn, - regionEnumID, - tree.ObjectLookupFlags{ - CommonLookupFlags: tree.CommonLookupFlags{ - AvoidLeased: !o.useCache, - IncludeOffline: o.includeOffline, - }, - }, + dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs( + ctx, txn, dbID, descsCol, o.useCache, o.includeOffline, ) if err != nil { return multiregion.RegionConfig{}, err } + regionConfig := multiregion.RegionConfig{} + var regionNames catpb.RegionNames if o.forValidation { - regionNames, err = regionEnum.RegionNamesForValidation() + regionNames, err = regionEnumDesc.RegionNamesForValidation() } else { - regionNames, err = regionEnum.RegionNames() + regionNames, err = regionEnumDesc.RegionNames() } if err != nil { return regionConfig, err } - zoneCfgExtensions, err := regionEnum.ZoneConfigExtensions() + zoneCfgExtensions, err := regionEnumDesc.ZoneConfigExtensions() + if err != nil { + return regionConfig, err + } + + transitioningRegionNames, err := regionEnumDesc.TransitioningRegionNames() if err != nil { return regionConfig, err } - transitioningRegionNames, err := regionEnum.TransitioningRegionNames() + superRegions, err := regionEnumDesc.SuperRegions() if err != nil { return regionConfig, err } - superRegions, err := regionEnum.SuperRegions() + regionEnumID, err := dbDesc.MultiRegionEnumID() if err != nil { return regionConfig, err } @@ -1512,6 +1494,115 @@ func SynthesizeRegionConfig( return regionConfig, nil } +// GetLocalityRegionEnumPhysicalRepresentation returns the physical +// representation of the given locality stored in the multi-region enum type +// associated with dbID. If the given locality isn't found, the physical +// representation of the primary region in dbID will be returned instead. +// This returns an ErrNotMultiRegionDatabase error if the database isn't +// multi-region. +func GetLocalityRegionEnumPhysicalRepresentation( + ctx context.Context, + internalExecutorFactory descs.TxnManager, + kvDB *kv.DB, + dbID descpb.ID, + locality roachpb.Locality, +) ([]byte, error) { + var enumReps map[catpb.RegionName][]byte + var primaryRegion catpb.RegionName + if err := internalExecutorFactory.DescsTxn(ctx, kvDB, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + enumReps, primaryRegion = nil, "" // reset for retry + var err error + enumReps, primaryRegion, err = getRegionEnumRepresentations(ctx, txn, dbID, descsCol) + return err + }); err != nil { + return nil, err + } + + // The primary region will be used if no region was provided through the + // locality flag. + currentRegion, _ := locality.Find("region") + if enumValue, ok := enumReps[catpb.RegionName(currentRegion)]; ok { + return enumValue, nil + } + if enumValue, ok := enumReps[primaryRegion]; ok { + return enumValue, nil + } + // This shouldn't be the case since if a primary region is defined for the + // database, there should exist a corresponding enum member value. + return nil, errors.AssertionFailedf("primary region not found") +} + +// getRegionEnumRepresentations returns representations stored in the +// multi-region enum type associated with dbID, and the primary region of it. +// An ErrNotMultiRegionDatabase error will be returned if the database isn't +// multi-region. +func getRegionEnumRepresentations( + ctx context.Context, txn *kv.Txn, dbID descpb.ID, descsCol *descs.Collection, +) (enumReps map[catpb.RegionName][]byte, primaryRegion catpb.RegionName, err error) { + dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs( + ctx, txn, dbID, descsCol, false /* useCache */, false /* includeOffline */) + if err != nil { + return nil, "", err + } + + enumReps = make(map[catpb.RegionName][]byte) + for ord := 0; ord < regionEnumDesc.NumEnumMembers(); ord++ { + if regionEnumDesc.IsMemberReadOnly(ord) { + continue + } + enumReps[catpb.RegionName( + regionEnumDesc.GetMemberLogicalRepresentation(ord), + )] = regionEnumDesc.GetMemberPhysicalRepresentation(ord) + } + return enumReps, dbDesc.GetRegionConfig().PrimaryRegion, nil +} + +// getDBAndRegionEnumDescs returns descriptors for both the database and +// multi-region enum type. If the database isn't multi-region, an +// ErrNotMultiRegionDatabase error will be returned. +func getDBAndRegionEnumDescs( + ctx context.Context, + txn *kv.Txn, + dbID descpb.ID, + descsCol *descs.Collection, + useCache bool, + includeOffline bool, +) (dbDesc catalog.DatabaseDescriptor, regionEnumDesc catalog.TypeDescriptor, _ error) { + _, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{ + AvoidLeased: !useCache, + Required: true, + IncludeOffline: includeOffline, + }) + if err != nil { + return nil, nil, err + } + if !dbDesc.IsMultiRegion() { + return nil, nil, ErrNotMultiRegionDatabase + } + regionEnumID, err := dbDesc.MultiRegionEnumID() + if err != nil { + return nil, nil, err + } + regionEnumDesc, err = descsCol.GetImmutableTypeByID( + ctx, + txn, + regionEnumID, + tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidLeased: !useCache, + Required: true, + IncludeOffline: includeOffline, + }, + }, + ) + if err != nil { + return nil, nil, err + } + return dbDesc, regionEnumDesc, nil +} + // blockDiscardOfZoneConfigForMultiRegionObject determines if discarding the // zone configuration of a multi-region table, index or partition should be // blocked. We only block the discard if the multi-region abstractions have diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 87598d59c740..f8ac6aa11b97 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -34,6 +34,7 @@ go_test( ":slinstance", "//pkg/clusterversion", "//pkg/settings/cluster", + "//pkg/sql/enum", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", "//pkg/util/hlc", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 9c0502c0ebe3..16be8bbae875 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -121,6 +121,7 @@ type Instance struct { // sessionEvents gets notified of some session state changes. sessionEvents SessionEventListener storage Writer + currentRegion []byte ttl func() time.Duration hb func() time.Duration testKnobs sqlliveness.TestingKnobs @@ -197,7 +198,11 @@ func (l *Instance) clearSessionLocked(ctx context.Context) (createNewSession boo // createSession tries until it can create a new session and returns an error // only if the heart beat loop should exit. func (l *Instance) createSession(ctx context.Context) (*session, error) { - id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + region := enum.One + if l.currentRegion != nil { + region = l.currentRegion + } + id, err := slstorage.MakeSessionID(region, uuid.MakeV4()) if err != nil { return nil, err } @@ -396,9 +401,9 @@ func NewSQLInstance( } // Start runs the hearbeat loop. -func (l *Instance) Start(ctx context.Context) { - l.mu.Lock() - defer l.mu.Unlock() +func (l *Instance) Start(ctx context.Context, regionPhysicalRep []byte) { + l.currentRegion = regionPhysicalRep + log.Infof(ctx, "starting SQL liveness instance") // Detach from ctx's cancelation. taskCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index d7941f7f3d04..3dd1754b9f32 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" @@ -45,11 +46,11 @@ func TestSQLInstance(t *testing.T) { fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) - sqlInstance.Start(ctx) + sqlInstance.Start(ctx, nil) // Add one more instance to introduce concurrent access to storage. dummy := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) - dummy.Start(ctx) + dummy.Start(ctx, nil) s1, err := sqlInstance.Session(ctx) require.NoError(t, err) @@ -57,6 +58,11 @@ func TestSQLInstance(t *testing.T) { require.NoError(t, err) require.True(t, a) + region, id, err := slstorage.UnsafeDecodeSessionID(s1.ID()) + require.NoError(t, err) + require.Equal(t, enum.One, region) + require.NotNil(t, id) + s2, err := sqlInstance.Session(ctx) require.NoError(t, err) require.Equal(t, s1.ID(), s2.ID()) @@ -91,3 +97,34 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } + +func TestSQLInstanceWithRegion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) + sqlInstance.Start(ctx, []byte{42}) + + s1, err := sqlInstance.Session(ctx) + require.NoError(t, err) + a, err := fakeStorage.IsAlive(ctx, s1.ID()) + require.NoError(t, err) + require.True(t, a) + + region, id, err := slstorage.UnsafeDecodeSessionID(s1.ID()) + require.NoError(t, err) + require.Equal(t, []byte{42}, region) + require.NotNil(t, id) +} diff --git a/pkg/sql/sqlliveness/slprovider/slprovider.go b/pkg/sql/sqlliveness/slprovider/slprovider.go index fca26ceae100..9e593bb780ec 100644 --- a/pkg/sql/sqlliveness/slprovider/slprovider.go +++ b/pkg/sql/sqlliveness/slprovider/slprovider.go @@ -55,9 +55,9 @@ type provider struct { var _ sqlliveness.Provider = &provider{} -func (p *provider) Start(ctx context.Context) { +func (p *provider) Start(ctx context.Context, regionPhysicalRep []byte) { p.Storage.Start(ctx) - p.Instance.Start(ctx) + p.Instance.Start(ctx, regionPhysicalRep) } func (p *provider) Metrics() metric.Struct { diff --git a/pkg/sql/sqlliveness/sqlliveness.go b/pkg/sql/sqlliveness/sqlliveness.go index 091b69d14710..6510277f5dc1 100644 --- a/pkg/sql/sqlliveness/sqlliveness.go +++ b/pkg/sql/sqlliveness/sqlliveness.go @@ -34,10 +34,16 @@ type SessionID string // Provider is a wrapper around the sqlliveness subsystem for external // consumption. type Provider interface { - Start(ctx context.Context) - Metrics() metric.Struct Liveness + // Start starts the sqlliveness subsystem. regionPhysicalRep should + // represent the physical representation of the current process region + // stored in the multi-region enum type associated with the system database. + Start(ctx context.Context, regionPhysicalRep []byte) + + // Metrics returns a metric.Struct which holds metrics for the provider. + Metrics() metric.Struct + // CachedReader returns a reader which only consults its local cache and // does not perform any RPCs in the IsAlive call. CachedReader() Reader