Skip to content

Commit

Permalink
keyspace, tso, apiv2: impl the interface to merge all keyspace groups…
Browse files Browse the repository at this point in the history
… into the default (tikv#6757)

ref tikv#6756

Impl the interface to merge all keyspace groups into the default.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 3ec2f20 commit bec6b81
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 35 deletions.
16 changes: 8 additions & 8 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// We use 120 here to leave some space for other operations.
// See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61
maxEtcdTxnOps = 120
MaxEtcdTxnOps = 120
)

// Config is the interface for keyspace config.
Expand Down Expand Up @@ -685,7 +685,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -709,7 +709,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps)
if err != nil {
return err
}
Expand All @@ -719,9 +719,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
moreToPatrol = keyspaceNum == MaxEtcdTxnOps
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand Down Expand Up @@ -760,7 +760,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -774,7 +774,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand Down
18 changes: 9 additions & 9 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
Expand All @@ -430,15 +430,15 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -453,22 +453,22 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1]
// Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(maxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1
startKeyspaceID = uint32(MaxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
Expand Down
101 changes: 100 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
// - Load and delete the keyspace groups in the merge list.
// - Load and update the target keyspace group.
// So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction.
if (mergeListNum+1)*2 > maxEtcdTxnOps {
if (mergeListNum+1)*2 > MaxEtcdTxnOps {
return ErrExceedMaxEtcdTxnOps
}
if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) {
Expand Down Expand Up @@ -1007,6 +1007,105 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
return nil
}

// MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group.
func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
defer logutil.LogPanic()
// Since we don't take the default keyspace group into account,
// the number of unmerged keyspace groups is -1.
unmergedGroupNum := -1
// Calculate the total number of keyspace groups to merge.
for _, groups := range m.groups {
unmergedGroupNum += groups.Len()
}
mergedGroupNum := 0
// Start to merge all keyspace groups into the default one.
for userKind, groups := range m.groups {
mergeNum := groups.Len()
log.Info("start to merge all keyspace groups into the default one",
zap.Stringer("user-kind", userKind),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
if mergeNum == 0 {
continue
}
var (
maxBatchSize = MaxEtcdTxnOps/2 - 1
groupsToMerge = make([]uint32, 0, maxBatchSize)
)
for idx, group := range groups.GetAll() {
if group.ID == utils.DefaultKeyspaceGroupID {
continue
}
groupsToMerge = append(groupsToMerge, group.ID)
if len(groupsToMerge) < maxBatchSize && idx < mergeNum-1 {
continue
}
log.Info("merge keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
// Reach the batch size, merge them into the default keyspace group.
if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil {
log.Error("failed to merge all keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum),
zap.Error(err))
return err
}
// Wait for the merge to finish.
ctx, cancel := context.WithTimeout(m.ctx, time.Minute)
ticker := time.NewTicker(time.Second)
checkLoop:
for {
select {
case <-ctx.Done():
log.Info("cancel merging all keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
cancel()
ticker.Stop()
return nil
case <-ticker.C:
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to check the default keyspace group merge state",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum),
zap.Error(err))
cancel()
ticker.Stop()
return err
}
if !kg.IsMergeTarget() {
break checkLoop
}
}
}
cancel()
ticker.Stop()
mergedGroupNum += len(groupsToMerge)
unmergedGroupNum -= len(groupsToMerge)
groupsToMerge = groupsToMerge[:0]
}
}
log.Info("finish merging all keyspace groups into the default one",
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
return nil
}

// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID.
func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
// check if the keyspace group exists
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5})
re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error())
// merge with the number of keyspace groups exceeds the limit
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2))
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
// merge the default keyspace group
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID})
Expand Down
15 changes: 12 additions & 3 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ func FinishSplitKeyspaceByID(c *gin.Context) {

// MergeKeyspaceGroupsParams defines the params for merging the keyspace groups.
type MergeKeyspaceGroupsParams struct {
MergeList []uint32 `json:"merge-list"`
MergeList []uint32 `json:"merge-list"`
MergeAllIntoDefault bool `json:"merge-all-into-default"`
}

// MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group.
Expand All @@ -292,10 +293,14 @@ func MergeKeyspaceGroups(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if len(mergeParams.MergeList) == 0 {
if len(mergeParams.MergeList) == 0 && !mergeParams.MergeAllIntoDefault {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list")
return
}
if len(mergeParams.MergeList) > 0 && mergeParams.MergeAllIntoDefault {
c.AbortWithStatusJSON(http.StatusBadRequest, "non-empty merge list when merge all into default")
return
}
for _, mergeID := range mergeParams.MergeList {
if !isValid(mergeID) {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
Expand All @@ -310,7 +315,11 @@ func MergeKeyspaceGroups(c *gin.Context) {
return
}
// Merge keyspace group.
err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList)
if mergeParams.MergeAllIntoDefault {
err = groupManager.MergeAllIntoDefaultKeyspaceGroup()
} else {
err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList)
}
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down
70 changes: 57 additions & 13 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tso

import (
"context"
"math/rand"
"strings"
"sync"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -465,15 +467,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
re.Equal(uint32(0), kg.ID)
re.Equal([]uint32{0}, kg.Keyspaces)
re.False(kg.IsSplitting())
// wait for finishing alloc nodes
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
testConfig := map[string]string{
"config": "1",
"tso_keyspace_group_id": "0",
Expand All @@ -483,15 +478,19 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
Name: "test_keyspace",
Config: testConfig,
})
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func waitFinishAllocNodes(re *require.Assertions, server *tests.TestServer, groupID uint32) {
testutil.Eventually(re, func() bool {
kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, groupID)
re.Equal(groupID, kg.ID)
return len(kg.Members) == mcsutils.DefaultKeyspaceGroupReplicaCount
})
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -749,3 +748,48 @@ func TestGetTSOImmediately(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

var (
keyspaceGroupNum = keyspace.MaxEtcdTxnOps
keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum)
keyspaces = make([]uint32, 0, keyspaceGroupNum)
)
for i := 1; i <= keyspaceGroupNum; i++ {
keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{
ID: uint32(i),
UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(),
Keyspaces: []uint32{uint32(i)},
})
keyspaces = append(keyspaces, uint32(i))
if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum {
continue
}
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: keyspaceGroups,
})
keyspaceGroups = keyspaceGroups[:0]
}
// Check if all the keyspace groups are created.
groups := handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0")
re.Len(groups, keyspaceGroupNum+1)
// Wait for all the keyspace groups to be served.
svr := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(svr)
svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(keyspaceGroupNum), uint32(keyspaceGroupNum))
re.NotNil(svr)
// Merge all the keyspace groups into the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeAllIntoDefault: true,
})
// Wait for all the keyspace groups to be merged.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, keyspaces)
// Check if all the keyspace groups are merged.
groups = handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0")
re.Len(groups, 1)

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

0 comments on commit bec6b81

Please sign in to comment.