diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 76826b5dd955..52845d27c5d1 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -63,6 +63,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", + "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", @@ -73,6 +74,7 @@ go_test( "//pkg/sql/parser", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/sqlinstance/instancestorage", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", @@ -91,6 +93,7 @@ go_test( "//pkg/util/syncutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", + "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/multiregionccl/multiregion_system_table_test.go b/pkg/ccl/multiregionccl/multiregion_system_table_test.go index 4a7897589e60..6b9cbb16cb81 100644 --- a/pkg/ccl/multiregionccl/multiregion_system_table_test.go +++ b/pkg/ccl/multiregionccl/multiregion_system_table_test.go @@ -12,6 +12,7 @@ import ( "context" gosql "database/sql" "testing" + "time" "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" @@ -28,14 +29,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -123,10 +127,13 @@ func TestMrSystemDatabase(t *testing.T) { defer log.Scope(t).Close(t) defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + ctx := context.Background() + // Enable settings required for configuring a tenant's system database as multi-region. cs := cluster.MakeTestingClusterSettings() - sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true) - sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true) + sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &cs.SV, true) + sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &cs.SV, true) + instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond) cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs)) defer cleanup() @@ -147,7 +154,6 @@ func TestMrSystemDatabase(t *testing.T) { tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`) tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`) - ctx := context.Background() executor := tenantServer.ExecutorConfig().(sql.ExecutorConfig) // Changing the type of the crdb_region field is required to modify the @@ -233,6 +239,67 @@ func TestMrSystemDatabase(t *testing.T) { } } require.NoError(t, rows.Close()) + + query = ` + SELECT count(id), crdb_region + FROM system.sql_instances + WHERE session_id IS NULL GROUP BY crdb_region + ` + preallocatedCount := instancestorage.PreallocatedCount.Get(&cs.SV) + testutils.SucceedsSoon(t, func() error { + rows := tDB.Query(t, query) + require.True(t, rows.Next()) + + countMap := map[string]int{} + for { + var count int + var crdb_region string + + require.NoError(t, rows.Scan(&count, &crdb_region)) + countMap[crdb_region] = count + + if !rows.Next() { + break + } + } + require.NoError(t, rows.Close()) + if len(countMap) != 3 { + return errors.New("some regions have not been preallocated") + } + for _, r := range []string{"us-east1", "us-east2", "us-east3"} { + c, ok := countMap[r] + require.True(t, ok) + if c != int(preallocatedCount) { + return errors.Newf("require %d, but got %d", preallocatedCount, c) + } + } + return nil + }) + }) + + t.Run("Reclaim", func(t *testing.T) { + id := uuid.MakeV4() + s1, err := slstorage.MakeSessionID(make([]byte, 100), id) + require.NoError(t, err) + s2, err := slstorage.MakeSessionID(make([]byte, 200), id) + require.NoError(t, err) + + // Insert expired entries into sql_instances. + tDB.Exec(t, `INSERT INTO system.sql_instances (id, addr, session_id, locality, crdb_region) VALUES + (100, NULL, $1, NULL, 'us-east2'), + (200, NULL, $2, NULL, 'us-east3')`, s1.UnsafeBytes(), s2.UnsafeBytes()) + + query := `SELECT count(*) FROM system.sql_instances WHERE id = 42` + + // Wait until expired entries get removed. + testutils.SucceedsSoon(t, func() error { + var rowCount int + tDB.QueryRow(t, query).Scan(&rowCount) + if rowCount != 0 { + return errors.New("some regions have not been reclaimed") + } + return nil + }) }) }) } diff --git a/pkg/ccl/multiregionccl/region_util_test.go b/pkg/ccl/multiregionccl/region_util_test.go index ac1ac9780997..b3ec84c6e291 100644 --- a/pkg/ccl/multiregionccl/region_util_test.go +++ b/pkg/ccl/multiregionccl/region_util_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -43,47 +44,80 @@ func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) { s0 := tc.ServerTyped(0) ief := s0.InternalExecutorFactory().(descs.TxnManager) - dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo") + dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo")) t.Run("with locality that exists", func(t *testing.T) { regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( - ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + ctx, ief, s0.DB(), dbID, roachpb.Locality{ Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}}, }, ) require.NoError(t, err) - enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID) require.NotEmpty(t, enumMembers) require.Equal(t, enumMembers["us-east2"], regionEnum) }) t.Run("with non-existent locality", func(t *testing.T) { regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( - ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{ + ctx, ief, s0.DB(), dbID, roachpb.Locality{ Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}}, }, ) require.NoError(t, err) // Fallback to primary region if the locality is provided, but non-existent. - enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID) require.NotEmpty(t, enumMembers) require.Equal(t, enumMembers["us-east1"], regionEnum) }) t.Run("without locality", func(t *testing.T) { regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation( - ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{}) + ctx, ief, s0.DB(), dbID, roachpb.Locality{}) require.NoError(t, err) // Fallback to primary region is locality information is missing. - enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID)) + enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID) require.NotEmpty(t, enumMembers) require.Equal(t, enumMembers["us-east1"], regionEnum) }) } +func TestGetRegionEnumRepresentations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 3 /* numServers */, base.TestingKnobs{}) + defer cleanup() + + tDB := sqlutils.MakeSQLRunner(sqlDB) + tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`) + + dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo")) + err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { + enumReps, primaryRegion, err := sql.GetRegionEnumRepresentations(ctx, txn, dbID, col) + require.NoError(t, err) + + require.Equal(t, catpb.RegionName("us-east1"), primaryRegion) + require.Len(t, enumReps, 3) + + expEnumReps := getEnumMembers(t, ctx, tc.Server(0), dbID) + require.Equal(t, len(expEnumReps), len(enumReps)) + + for r, rep := range expEnumReps { + res, ok := enumReps[catpb.RegionName(r)] + require.True(t, ok) + require.Equal(t, rep, res) + } + return nil + }) + require.NoError(t, err) +} + func getEnumMembers( t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID, ) map[string][]byte { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index dadc6cd20b15..3c49f8697a2e 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1306,7 +1306,7 @@ func (s *SQLServer) preStart( } // Start instance ID reclaim loop. if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop( - ctx, stopper, timeutil.DefaultTimeSource{}, session.Expiration, + ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration, ); err != nil { return err } diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index a1662d55a9ba..cecc2313a430 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -664,7 +664,7 @@ CREATE TABLE system.sql_instances ( addr STRING, session_id BYTES, locality JSONB, - crdb_region BYTES NOT NULL, + crdb_region BYTES NOT NULL, CONSTRAINT "primary" PRIMARY KEY (crdb_region, id), FAMILY "primary" (crdb_region, id, addr, session_id, locality) )` diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index 02c62c028337..35949edf2d00 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -1514,7 +1514,7 @@ func GetLocalityRegionEnumPhysicalRepresentation( ) error { enumReps, primaryRegion = nil, "" // reset for retry var err error - enumReps, primaryRegion, err = getRegionEnumRepresentations(ctx, txn, dbID, descsCol) + enumReps, primaryRegion, err = GetRegionEnumRepresentations(ctx, txn, dbID, descsCol) return err }); err != nil { return nil, err @@ -1534,11 +1534,11 @@ func GetLocalityRegionEnumPhysicalRepresentation( return nil, errors.AssertionFailedf("primary region not found") } -// getRegionEnumRepresentations returns representations stored in the +// GetRegionEnumRepresentations returns representations stored in the // multi-region enum type associated with dbID, and the primary region of it. // An ErrNotMultiRegionDatabase error will be returned if the database isn't // multi-region. -func getRegionEnumRepresentations( +func GetRegionEnumRepresentations( ctx context.Context, txn *kv.Txn, dbID descpb.ID, descsCol *descs.Collection, ) (enumReps map[catpb.RegionName][]byte, primaryRegion catpb.RegionName, err error) { dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs( diff --git a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel index 9e4d2dd05aec..7d4b827648a4 100644 --- a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel +++ b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel @@ -21,8 +21,10 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", "//pkg/sql/catalog/systemschema", "//pkg/sql/enum", "//pkg/sql/rowenc/valueside", @@ -65,6 +67,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", "//pkg/sql/catalog/systemschema", "//pkg/sql/enum", "//pkg/sql/sqlinstance", diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index b42674675c2c..977458a0d0c6 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -26,7 +26,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" @@ -235,9 +237,6 @@ func (s *Storage) CreateInstance( // getAvailableInstanceIDForRegion retrieves an available instance ID for the // current region associated with Storage s, and returns errNoPreallocatedRows // if there are no available rows. -// -// TODO(jaylim-crl): Store current region enum in s once we implement regional -// by row for the sql_instances table. func (s *Storage) getAvailableInstanceIDForRegion( ctx context.Context, region []byte, txn *kv.Txn, ) (base.SQLInstanceID, error) { @@ -387,10 +386,35 @@ func (s *Storage) RunInstanceIDReclaimLoop( ctx context.Context, stopper *stop.Stopper, ts timeutil.TimeSource, + internalExecutorFactory descs.TxnManager, sessionExpirationFn func() hlc.Timestamp, ) error { - // TODO(jeffswenson): load regions from the system database enum. - regions := [][]byte{enum.One} + loadRegions := func() ([][]byte, error) { + // Load regions from the system DB. + var regions [][]byte + if err := internalExecutorFactory.DescsTxn(ctx, s.db, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + enumReps, _, err := sql.GetRegionEnumRepresentations(ctx, txn, keys.SystemDatabaseID, descsCol) + if err != nil { + if errors.Is(err, sql.ErrNotMultiRegionDatabase) { + return nil + } + return err + } + for _, r := range enumReps { + regions = append(regions, r) + } + return nil + }); err != nil { + return nil, err + } + // The system database isn't multi-region. + if len(regions) == 0 { + regions = [][]byte{enum.One} + } + return regions, nil + } return stopper.RunAsyncTask(ctx, "instance-id-reclaim-loop", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) @@ -411,6 +435,14 @@ func (s *Storage) RunInstanceIDReclaimLoop( case <-timer.Ch(): timer.MarkRead() + // Load the regions each time we attempt to generate rows since + // regions can be added/removed to/from the system DB. + regions, err := loadRegions() + if err != nil { + log.Warningf(ctx, "failed to load regions from the system DB: %v", err) + continue + } + // Mark instances that belong to expired sessions as available // and delete surplus IDs. Cleaning up surplus IDs is necessary // to avoid ID exhaustion. @@ -432,13 +464,6 @@ func (s *Storage) RunInstanceIDReclaimLoop( // generateAvailableInstanceRows allocates available instance IDs, and store // them in the sql_instances table. When instance IDs are pre-allocated, all // other fields in that row will be NULL. -// -// TODO(jaylim-crl): Handle multiple regions in this logic. encodeRow has to be -// updated with crdb_region. When we handle multiple regions, we have to figure -// out where to get the list of regions, and ensure that we don't do a global -// read for each region assuming that the number of pre-allocated entries is -// insufficient. One global KV read and write would be sufficient for **all** -// regions. func (s *Storage) generateAvailableInstanceRows( ctx context.Context, regions [][]byte, sessionExpiration hlc.Timestamp, ) error { diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go index 94a488a45ec9..9131ee350f89 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go @@ -283,7 +283,7 @@ func TestIdsToReclaim(t *testing.T) { } } -func TestGenerateAvailableInstanceRows(t *testing.T) { +func TestReclaimAndGenerateInstanceRows(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go index c1e0ed917ab3..e9ac24f123a8 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" @@ -505,7 +506,8 @@ func TestReclaimLoop(t *testing.T) { const expiration = 5 * time.Hour sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - err := storage.RunInstanceIDReclaimLoop(ctx, s.Stopper(), ts, func() hlc.Timestamp { + ief := s.InternalExecutorFactory().(descs.TxnManager) + err := storage.RunInstanceIDReclaimLoop(ctx, s.Stopper(), ts, ief, func() hlc.Timestamp { return sessionExpiry }) require.NoError(t, err)