Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instancestorage: load all regions from the system DB when reclaiming IDs #92248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
Expand All @@ -73,6 +74,7 @@ go_test(
"//pkg/sql/parser",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/sql/sqltestutils",
Expand All @@ -91,6 +93,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
73 changes: 70 additions & 3 deletions pkg/ccl/multiregionccl/multiregion_system_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -28,14 +29,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -123,10 +127,13 @@ func TestMrSystemDatabase(t *testing.T) {
defer log.Scope(t).Close(t)
defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")()

ctx := context.Background()

// Enable settings required for configuring a tenant's system database as multi-region.
cs := cluster.MakeTestingClusterSettings()
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true)
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &cs.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &cs.SV, true)
instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond)

cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs))
defer cleanup()
Expand All @@ -147,7 +154,6 @@ func TestMrSystemDatabase(t *testing.T) {
tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`)
tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`)

ctx := context.Background()
executor := tenantServer.ExecutorConfig().(sql.ExecutorConfig)

// Changing the type of the crdb_region field is required to modify the
Expand Down Expand Up @@ -233,6 +239,67 @@ func TestMrSystemDatabase(t *testing.T) {
}
}
require.NoError(t, rows.Close())

query = `
SELECT count(id), crdb_region
FROM system.sql_instances
WHERE session_id IS NULL GROUP BY crdb_region
`
preallocatedCount := instancestorage.PreallocatedCount.Get(&cs.SV)
testutils.SucceedsSoon(t, func() error {
rows := tDB.Query(t, query)
require.True(t, rows.Next())

countMap := map[string]int{}
for {
var count int
var crdb_region string

require.NoError(t, rows.Scan(&count, &crdb_region))
countMap[crdb_region] = count

if !rows.Next() {
break
}
}
require.NoError(t, rows.Close())
if len(countMap) != 3 {
return errors.New("some regions have not been preallocated")
}
for _, r := range []string{"us-east1", "us-east2", "us-east3"} {
c, ok := countMap[r]
require.True(t, ok)
if c != int(preallocatedCount) {
return errors.Newf("require %d, but got %d", preallocatedCount, c)
}
}
return nil
})
})

t.Run("Reclaim", func(t *testing.T) {
id := uuid.MakeV4()
s1, err := slstorage.MakeSessionID(make([]byte, 100), id)
require.NoError(t, err)
s2, err := slstorage.MakeSessionID(make([]byte, 200), id)
require.NoError(t, err)

// Insert expired entries into sql_instances.
tDB.Exec(t, `INSERT INTO system.sql_instances (id, addr, session_id, locality, crdb_region) VALUES
(100, NULL, $1, NULL, 'us-east2'),
(200, NULL, $2, NULL, 'us-east3')`, s1.UnsafeBytes(), s2.UnsafeBytes())

query := `SELECT count(*) FROM system.sql_instances WHERE id = 42`

// Wait until expired entries get removed.
testutils.SucceedsSoon(t, func() error {
var rowCount int
tDB.QueryRow(t, query).Scan(&rowCount)
if rowCount != 0 {
return errors.New("some regions have not been reclaimed")
}
return nil
})
})
})
}
Expand Down
48 changes: 41 additions & 7 deletions pkg/ccl/multiregionccl/region_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -43,47 +44,80 @@ func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) {

s0 := tc.ServerTyped(0)
ief := s0.InternalExecutorFactory().(descs.TxnManager)
dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo")
dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo"))

t.Run("with locality that exists", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
ctx, ief, s0.DB(), dbID, roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}},
},
)
require.NoError(t, err)

enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east2"], regionEnum)
})

t.Run("with non-existent locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
ctx, ief, s0.DB(), dbID, roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}},
},
)
require.NoError(t, err)

// Fallback to primary region if the locality is provided, but non-existent.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})

t.Run("without locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{})
ctx, ief, s0.DB(), dbID, roachpb.Locality{})
require.NoError(t, err)

// Fallback to primary region is locality information is missing.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
enumMembers := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})
}

func TestGetRegionEnumRepresentations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{})
defer cleanup()

tDB := sqlutils.MakeSQLRunner(sqlDB)
tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`)

dbID := descpb.ID(sqlutils.QueryDatabaseID(t, sqlDB, "foo"))
err := sql.TestingDescsTxn(ctx, tc.Server(0), func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
enumReps, primaryRegion, err := sql.GetRegionEnumRepresentations(ctx, txn, dbID, col)
require.NoError(t, err)

require.Equal(t, catpb.RegionName("us-east1"), primaryRegion)
require.Len(t, enumReps, 3)

expEnumReps := getEnumMembers(t, ctx, tc.Server(0), dbID)
require.Equal(t, len(expEnumReps), len(enumReps))

for r, rep := range expEnumReps {
res, ok := enumReps[catpb.RegionName(r)]
require.True(t, ok)
require.Equal(t, rep, res)
}
return nil
})
require.NoError(t, err)
}

func getEnumMembers(
t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID,
) map[string][]byte {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func (s *SQLServer) preStart(
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, session.Expiration,
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalExecutorFactory, session.Expiration,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ CREATE TABLE system.sql_instances (
addr STRING,
session_id BYTES,
locality JSONB,
crdb_region BYTES NOT NULL,
crdb_region BYTES NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (crdb_region, id),
FAMILY "primary" (crdb_region, id, addr, session_id, locality)
)`
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/region_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ func GetLocalityRegionEnumPhysicalRepresentation(
) error {
enumReps, primaryRegion = nil, "" // reset for retry
var err error
enumReps, primaryRegion, err = getRegionEnumRepresentations(ctx, txn, dbID, descsCol)
enumReps, primaryRegion, err = GetRegionEnumRepresentations(ctx, txn, dbID, descsCol)
return err
}); err != nil {
return nil, err
Expand All @@ -1534,11 +1534,11 @@ func GetLocalityRegionEnumPhysicalRepresentation(
return nil, errors.AssertionFailedf("primary region not found")
}

// getRegionEnumRepresentations returns representations stored in the
// GetRegionEnumRepresentations returns representations stored in the
// multi-region enum type associated with dbID, and the primary region of it.
// An ErrNotMultiRegionDatabase error will be returned if the database isn't
// multi-region.
func getRegionEnumRepresentations(
func GetRegionEnumRepresentations(
ctx context.Context, txn *kv.Txn, dbID descpb.ID, descsCol *descs.Collection,
) (enumReps map[catpb.RegionName][]byte, primaryRegion catpb.RegionName, err error) {
dbDesc, regionEnumDesc, err := getDBAndRegionEnumDescs(
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/enum",
"//pkg/sql/rowenc/valueside",
Expand Down Expand Up @@ -65,6 +67,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/enum",
"//pkg/sql/sqlinstance",
Expand Down
49 changes: 37 additions & 12 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
Expand Down Expand Up @@ -235,9 +237,6 @@ func (s *Storage) CreateInstance(
// getAvailableInstanceIDForRegion retrieves an available instance ID for the
// current region associated with Storage s, and returns errNoPreallocatedRows
// if there are no available rows.
//
// TODO(jaylim-crl): Store current region enum in s once we implement regional
// by row for the sql_instances table.
func (s *Storage) getAvailableInstanceIDForRegion(
ctx context.Context, region []byte, txn *kv.Txn,
) (base.SQLInstanceID, error) {
Expand Down Expand Up @@ -387,10 +386,35 @@ func (s *Storage) RunInstanceIDReclaimLoop(
ctx context.Context,
stopper *stop.Stopper,
ts timeutil.TimeSource,
internalExecutorFactory descs.TxnManager,
sessionExpirationFn func() hlc.Timestamp,
) error {
// TODO(jeffswenson): load regions from the system database enum.
regions := [][]byte{enum.One}
loadRegions := func() ([][]byte, error) {
// Load regions from the system DB.
var regions [][]byte
if err := internalExecutorFactory.DescsTxn(ctx, s.db, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
enumReps, _, err := sql.GetRegionEnumRepresentations(ctx, txn, keys.SystemDatabaseID, descsCol)
if err != nil {
if errors.Is(err, sql.ErrNotMultiRegionDatabase) {
return nil
}
return err
}
for _, r := range enumReps {
regions = append(regions, r)
}
return nil
}); err != nil {
return nil, err
}
// The system database isn't multi-region.
if len(regions) == 0 {
regions = [][]byte{enum.One}
}
return regions, nil
}

return stopper.RunAsyncTask(ctx, "instance-id-reclaim-loop", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -411,6 +435,14 @@ func (s *Storage) RunInstanceIDReclaimLoop(
case <-timer.Ch():
timer.MarkRead()

// Load the regions each time we attempt to generate rows since
// regions can be added/removed to/from the system DB.
regions, err := loadRegions()
if err != nil {
log.Warningf(ctx, "failed to load regions from the system DB: %v", err)
continue
}

// Mark instances that belong to expired sessions as available
// and delete surplus IDs. Cleaning up surplus IDs is necessary
// to avoid ID exhaustion.
Expand All @@ -432,13 +464,6 @@ func (s *Storage) RunInstanceIDReclaimLoop(
// generateAvailableInstanceRows allocates available instance IDs, and store
// them in the sql_instances table. When instance IDs are pre-allocated, all
// other fields in that row will be NULL.
//
// TODO(jaylim-crl): Handle multiple regions in this logic. encodeRow has to be
// updated with crdb_region. When we handle multiple regions, we have to figure
// out where to get the list of regions, and ensure that we don't do a global
// read for each region assuming that the number of pre-allocated entries is
// insufficient. One global KV read and write would be sufficient for **all**
// regions.
func (s *Storage) generateAvailableInstanceRows(
ctx context.Context, regions [][]byte, sessionExpiration hlc.Timestamp,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestIdsToReclaim(t *testing.T) {
}
}

func TestGenerateAvailableInstanceRows(t *testing.T) {
func TestReclaimAndGenerateInstanceRows(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down
Loading