Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, api2: fix the keyspace assignment patrol consistency #6397

Merged
merged 5 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 84 additions & 4 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package keyspace

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -72,7 +74,8 @@ type Manager struct {
ctx context.Context
// config is the configurations of the manager.
config Config
kgm *GroupManager
// kgm is the keyspace group manager of the server.
kgm *GroupManager
}

// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
Expand All @@ -86,7 +89,9 @@ type CreateKeyspaceRequest struct {
}

// NewKeyspaceManager creates a Manager of keyspace related data.
func NewKeyspaceManager(store endpoint.KeyspaceStorage,
func NewKeyspaceManager(
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster schedule.Cluster,
idAllocator id.Allocator,
config Config,
Expand All @@ -97,7 +102,7 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage,
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: context.TODO(),
ctx: ctx,
config: config,
kgm: kgm,
}
Expand Down Expand Up @@ -540,7 +545,18 @@ func (manager *Manager) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspac
if startID > spaceIDMax {
return nil, errors.Errorf("startID of the scan %d exceeds spaceID Max %d", startID, spaceIDMax)
}
return manager.store.LoadRangeKeyspace(startID, limit)
var (
keyspaces []*keyspacepb.KeyspaceMeta
err error
)
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
keyspaces, err = manager.store.LoadRangeKeyspace(txn, startID, limit)
return err
})
if err != nil {
return nil, err
}
return keyspaces, nil
}

// allocID allocate a new keyspace id.
Expand All @@ -555,3 +571,67 @@ func (manager *Manager) allocID() (uint32, error) {
}
return id32, nil
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
// TODO: since the number of keyspaces might be large, we should consider to assign them in batches.
return manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only worry is that the transaction could be too large to accepted by etcd if we have too many keyspaces. Do we need to do it in batches?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good for me for now. We can revisit the comment above after we collect some rough data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the recent discussion, it looks like we need to process it in batch now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add txn in other operators?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0)
if err != nil {
return err
}
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, len(keyspaces))
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else {
// If the keyspace already has a group ID, skip it.
_, ok := ks.Config[TSOKeyspaceGroupIDKey]
if ok {
manager.metaLock.Unlock(ks.Id)
continue
}
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Uint32("ID", ks.Id), zap.Error(err))
return err
}
}
if assigned {
return manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
}
return nil
})
}
30 changes: 29 additions & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(suite.manager.Bootstrap())
}
Expand Down Expand Up @@ -354,3 +355,30 @@ func updateKeyspaceConfig(re *require.Assertions, manager *Manager, name string,
oldMeta = updatedMeta
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Check if the keyspace is not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111))
}
51 changes: 3 additions & 48 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,9 @@ type GroupManager struct {
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
// patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces
// and assign them to the keyspace groups.
patrolKeyspaceAssignmentOnce bool

// store is the storage for keyspace group related information.
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
}
store endpoint.KeyspaceGroupStorage

client *clientv3.Client

Expand All @@ -88,10 +82,7 @@ type GroupManager struct {
// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(
ctx context.Context,
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
},
store endpoint.KeyspaceGroupStorage,
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
Expand Down Expand Up @@ -170,38 +161,6 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (m *GroupManager) patrolKeyspaceAssignment() error {
m.Lock()
defer m.Unlock()
if m.patrolKeyspaceAssignmentOnce {
return nil
}
keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic)
if err != nil {
return err
}
for _, ks := range keyspaces {
if ks == nil {
continue
}
groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64)
if err != nil {
return err
}
err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd)
if err != nil {
return err
}
}
m.patrolKeyspaceAssignmentOnce = true
return nil
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand Down Expand Up @@ -602,14 +561,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
err := m.patrolKeyspaceAssignment()
if err != nil {
return err
}
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
Expand Down
31 changes: 1 addition & 30 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand Down Expand Up @@ -48,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
}

Expand Down Expand Up @@ -321,31 +320,3 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Force the patrol to run once.
suite.kgm.patrolKeyspaceAssignmentOnce = false
// Create a keyspace group without any keyspace.
err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Members: make([]endpoint.KeyspaceGroupMember, 2),
},
})
re.NoError(err)
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Split to see if the keyspace is attached to the group.
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111})
re.NoError(err)
}
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type KeyspaceStorage interface {
SaveKeyspaceID(txn kv.Txn, id uint32, name string) error
LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error)
// LoadRangeKeyspace loads no more than limit keyspaces starting at startID.
LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error)
LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error)
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

Expand Down Expand Up @@ -104,10 +104,10 @@ func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) erro

// LoadRangeKeyspace loads keyspaces starting at startID.
// limit specifies the limit of loaded keyspaces.
func (se *StorageEndpoint) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) {
func (se *StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) {
startKey := KeyspaceMetaPath(startID)
endKey := clientv3.GetPrefixRangeEnd(KeyspaceMetaPrefix())
keys, values, err := se.LoadRange(startKey, endKey, limit)
keys, values, err := txn.LoadRange(startKey, endKey, limit)
if err != nil {
return nil, err
}
Expand Down
30 changes: 17 additions & 13 deletions pkg/storage/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,35 @@ func TestSaveLoadKeyspace(t *testing.T) {
func TestLoadRangeKeyspaces(t *testing.T) {
re := require.New(t)
storage := NewStorageWithMemoryBackend()

// Store test keyspace meta.
keyspaces := makeTestKeyspaces()
// Store test keyspace meta.
err := storage.RunInTxn(context.TODO(), func(txn kv.Txn) error {
for _, keyspace := range keyspaces {
re.NoError(storage.SaveKeyspaceMeta(txn, keyspace))
}
return nil
})
re.NoError(err)
// Test load range keyspaces.
err = storage.RunInTxn(context.TODO(), func(txn kv.Txn) error {
// Load all keyspaces.
loadedKeyspaces, err := storage.LoadRangeKeyspace(txn, keyspaces[0].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces, loadedKeyspaces)

// Load all keyspaces.
loadedKeyspaces, err := storage.LoadRangeKeyspace(keyspaces[0].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces, loadedKeyspaces)
// Load keyspaces with id >= second test keyspace's id.
loadedKeyspaces2, err := storage.LoadRangeKeyspace(txn, keyspaces[1].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces[1:], loadedKeyspaces2)

// Load keyspaces with id >= second test keyspace's id.
loadedKeyspaces2, err := storage.LoadRangeKeyspace(keyspaces[1].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces[1:], loadedKeyspaces2)
// Load keyspace with the smallest id.
loadedKeyspace3, err := storage.LoadRangeKeyspace(txn, 1, 1)
re.NoError(err)
re.ElementsMatch(keyspaces[:1], loadedKeyspace3)

// Load keyspace with the smallest id.
loadedKeyspace3, err := storage.LoadRangeKeyspace(1, 1)
return nil
})
re.NoError(err)
re.ElementsMatch(keyspaces[:1], loadedKeyspace3)
}

func makeTestKeyspaces() []*keyspacepb.KeyspaceMeta {
Expand Down
Loading