Skip to content

Commit

Permalink
instancestorage: load all regions from the system DB when reclaiming IDs
Browse files Browse the repository at this point in the history
Follow up to cockroachdb#92010 and cockroachdb#91694.

This commit addresses a TODO comment by loading all region enum members from
the system database when reclaiming IDs. The list of regions is read each time
the loop runs since there's a possibiilty where regions are added or removed
from the system DB.

Epic: None

Release note: None
  • Loading branch information
jaylim-crl committed Nov 23, 2022
1 parent 3685865 commit 1d09763
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 29 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
Expand All @@ -73,6 +74,7 @@ go_test(
"//pkg/sql/parser",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/sql/sqltestutils",
Expand All @@ -91,6 +93,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
73 changes: 70 additions & 3 deletions pkg/ccl/multiregionccl/multiregion_system_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -28,14 +29,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -123,10 +127,13 @@ func TestMrSystemDatabase(t *testing.T) {
defer log.Scope(t).Close(t)
defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")()

ctx := context.Background()

// Enable settings required for configuring a tenant's system database as multi-region.
cs := cluster.MakeTestingClusterSettings()
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true)
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &cs.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &cs.SV, true)
instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond)

cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs))
defer cleanup()
Expand All @@ -147,7 +154,6 @@ func TestMrSystemDatabase(t *testing.T) {
tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`)
tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`)

ctx := context.Background()
executor := tenantServer.ExecutorConfig().(sql.ExecutorConfig)

// Changing the type of the crdb_region field is required to modify the
Expand Down Expand Up @@ -233,6 +239,67 @@ func TestMrSystemDatabase(t *testing.T) {
}
}
require.NoError(t, rows.Close())

query = `
SELECT COUNT(id), crdb_region
FROM system.sql_instances
WHERE session_id IS NULL GROUP BY crdb_region
`
preallocatedCount := instancestorage.PreallocatedCount.Get(&cs.SV)
testutils.SucceedsSoon(t, func() error {
rows := tDB.Query(t, query)
require.True(t, rows.Next())

countMap := map[string]int{}
for {
var count int
var crdb_region string

require.NoError(t, rows.Scan(&count, &crdb_region))
countMap[crdb_region] = count

if !rows.Next() {
break
}
}
require.NoError(t, rows.Close())
if len(countMap) != 3 {
return errors.New("some regions have not been preallocated")
}
for _, r := range []string{"us-east1", "us-east2", "us-east3"} {
c, ok := countMap[r]
require.True(t, ok)
if c != int(preallocatedCount) {
return errors.Newf("require %d, but got %d", preallocatedCount, c)
}
}
return nil
})
})

t.Run("Reclaim", func(t *testing.T) {
id := uuid.MakeV4()
s1, err := slstorage.MakeSessionID(make([]byte, 100), id)
require.NoError(t, err)
s2, err := slstorage.MakeSessionID(make([]byte, 200), id)
require.NoError(t, err)

// Insert expired entries into sql_instances.
tDB.Exec(t, `INSERT INTO system.sql_instances (id, addr, session_id, locality, crdb_region) VALUES
(100, NULL, $1, NULL, 'us-east2'),
(200, NULL, $2, NULL, 'us-east3')`, s1.UnsafeBytes(), s2.UnsafeBytes())

query := `SELECT count(*) FROM system.sql_instances WHERE id = 42`

// Wait until expired entries get removed.
testutils.SucceedsSoon(t, func() error {
var rowCount int
tDB.QueryRow(t, query).Scan(&rowCount)
if rowCount != 0 {
return errors.New("some regions have not been reclaimed")
}
return nil
})
})
})
}
Expand Down
48 changes: 41 additions & 7 deletions pkg/ccl/multiregionccl/region_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -43,47 +44,80 @@ func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) {

s0 := tc.ServerTyped(0)
ief := s0.InternalExecutorFactory().(descs.TxnManager)
dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo")
dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo"))

t.Run("with locality that exists", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
ctx, ief, s0.DB(), dbID, roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}},
},
)
require.NoError(t, err)

enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east2"], regionEnum)
})

t.Run("with non-existent locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
ctx, ief, s0.DB(), dbID, roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}},
},
)
require.NoError(t, err)

// Fallback to primary region if the locality is provided, but non-existent.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})

t.Run("without locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{})
ctx, ief, s0.DB(), dbID, roachpb.Locality{})
require.NoError(t, err)

// Fallback to primary region is locality information is missing.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})
}

func TestGetRegionEnumRepresentations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{})
defer cleanup()

tDB := sqlutils.MakeSQLRunner(sqlDB)
tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`)

dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo"))
err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
enumReps, primaryRegion, err := sql.GetRegionEnumRepresentations(ctx, txn, dbID, col)
require.NoError(t, err)

require.Equal(t, catpb.RegionName("us-east1"), primaryRegion)
require.Len(t, enumReps, 3)

expEnumReps := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.Equal(t, len(expEnumReps), len(enumReps))

for r, rep := range expEnumReps {
res, ok := enumReps[catpb.RegionName(r)]
require.True(t, ok)
require.Equal(t, rep, res)
}
return nil
})
require.NoError(t, err)
}

func getEnumMembers(
t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID,
) map[string][]byte {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func (s *SQLServer) preStart(
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, session.Expiration,
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ CREATE TABLE system.sql_instances (
addr STRING,
session_id BYTES,
locality JSONB,
crdb_region BYTES NOT NULL,
crdb_region BYTES NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (crdb_region, id),
FAMILY "primary" (crdb_region, id, addr, session_id, locality)
)`
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/region_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ func GetLocalityRegionEnumPhysicalRepresentation(
) error {
enumReps, primaryRegion = nil, "" // reset for retry
var err error
enumReps, primaryRegion, err = getRegionEnumRepresentations(ctx, txn, dbID, descsCol)
enumReps, primaryRegion, err = GetRegionEnumRepresentations(ctx, txn, dbID, descsCol)
return err
}); err != nil {
return nil, err
Expand All @@ -1534,11 +1534,11 @@ func GetLocalityRegionEnumPhysicalRepresentation(
return nil, errors.AssertionFailedf("primary region not found")
}

// getRegionEnumRepresentations returns representations stored in the
// GetRegionEnumRepresentations returns representations stored in the
// multi-region enum type associated with dbID, and the primary region of it.
// An ErrNotMultiRegionDatabase error will be returned if the database isn't
// multi-region.
func getRegionEnumRepresentations(
func GetRegionEnumRepresentations(
ctx context.Context, txn *kv.Txn, dbID descpb.ID, descsCol *descs.Collection,
) (enumReps map[catpb.RegionName][]byte, primaryRegion catpb.RegionName, err error) {
dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs(
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/enum",
"//pkg/sql/rowenc/valueside",
Expand Down Expand Up @@ -65,6 +67,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/enum",
"//pkg/sql/sqlinstance",
Expand Down
49 changes: 37 additions & 12 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
Expand Down Expand Up @@ -235,9 +237,6 @@ func (s *Storage) CreateInstance(
// getAvailableInstanceIDForRegion retrieves an available instance ID for the
// current region associated with Storage s, and returns errNoPreallocatedRows
// if there are no available rows.
//
// TODO(jaylim-crl): Store current region enum in s once we implement regional
// by row for the sql_instances table.
func (s *Storage) getAvailableInstanceIDForRegion(
ctx context.Context, region []byte, txn *kv.Txn,
) (base.SQLInstanceID, error) {
Expand Down Expand Up @@ -387,10 +386,35 @@ func (s *Storage) RunInstanceIDReclaimLoop(
ctx context.Context,
stopper *stop.Stopper,
ts timeutil.TimeSource,
internalExecutorFactory descs.TxnManager,
sessionExpirationFn func() hlc.Timestamp,
) error {
// TODO(jeffswenson): load regions from the system database enum.
regions := [][]byte{enum.One}
loadRegions := func() ([][]byte, error) {
// Load regions from the system DB.
var regions [][]byte
if err := internalExecutorFactory.DescsTxn(ctx, s.db, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
enumReps, _, err := sql.GetRegionEnumRepresentations(ctx, txn, keys.SystemDatabaseID, descsCol)
if err != nil {
if errors.Is(err, sql.ErrNotMultiRegionDatabase) {
return nil
}
return err
}
for _, r := range enumReps {
regions = append(regions, r)
}
return nil
}); err != nil {
return nil, err
}
// The system database isn't multi-region.
if len(regions) == 0 {
regions = [][]byte{enum.One}
}
return regions, nil
}

return stopper.RunAsyncTask(ctx, "instance-id-reclaim-loop", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -411,6 +435,14 @@ func (s *Storage) RunInstanceIDReclaimLoop(
case <-timer.Ch():
timer.MarkRead()

// Load the regions each time we attempt to generate rows since
// regions can be added/removed to/from the system DB.
regions, err := loadRegions()
if err != nil {
log.Warningf(ctx, "failed to load regions from the system DB: %v", err)
continue
}

// Mark instances that belong to expired sessions as available
// and delete surplus IDs. Cleaning up surplus IDs is necessary
// to avoid ID exhaustion.
Expand All @@ -432,13 +464,6 @@ func (s *Storage) RunInstanceIDReclaimLoop(
// generateAvailableInstanceRows allocates available instance IDs, and store
// them in the sql_instances table. When instance IDs are pre-allocated, all
// other fields in that row will be NULL.
//
// TODO(jaylim-crl): Handle multiple regions in this logic. encodeRow has to be
// updated with crdb_region. When we handle multiple regions, we have to figure
// out where to get the list of regions, and ensure that we don't do a global
// read for each region assuming that the number of pre-allocated entries is
// insufficient. One global KV read and write would be sufficient for **all**
// regions.
func (s *Storage) generateAvailableInstanceRows(
ctx context.Context, regions [][]byte, sessionExpiration hlc.Timestamp,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestIdsToReclaim(t *testing.T) {
}
}

func TestGenerateAvailableInstanceRows(t *testing.T) {
func TestReclaimAndGenerateInstanceRows(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down
Loading

0 comments on commit 1d09763

Please sign in to comment.