From f4be4a3348a693f7ff3a4f7a8458a5117959747b Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 5 Jul 2023 17:58:49 +0800 Subject: [PATCH 1/6] Impl the interface to merge all keyspace groups into the default Signed-off-by: JmPotato --- pkg/keyspace/keyspace.go | 16 ++-- pkg/keyspace/keyspace_test.go | 18 ++--- pkg/keyspace/tso_keyspace_group.go | 76 ++++++++++++++++++- pkg/keyspace/tso_keyspace_group_test.go | 2 +- server/apiv2/handlers/tso_keyspace_group.go | 15 +++- .../mcs/tso/keyspace_group_manager_test.go | 40 ++++++++++ 6 files changed, 145 insertions(+), 22 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 64dd1ba8622..ad61fe0a3fa 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -49,10 +49,10 @@ const ( UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. + // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. // We use 120 here to leave some space for other operations. // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 - maxEtcdTxnOps = 120 + MaxEtcdTxnOps = 120 ) // Config is the interface for keyspace config. @@ -681,7 +681,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID zap.Duration("cost", time.Since(start)), zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -705,7 +705,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID if defaultKeyspaceGroup.IsMerging() { return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID) } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps) + keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps) if err != nil { return err } @@ -715,9 +715,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID currentStartID = keyspaces[0].GetId() nextStartID = keyspaces[keyspaceNum-1].GetId() + 1 } - // If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end, + // If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end, // there is no need to patrol again. - moreToPatrol = keyspaceNum == maxEtcdTxnOps + moreToPatrol = keyspaceNum == MaxEtcdTxnOps var ( assigned = false keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum) @@ -756,7 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.store.SaveKeyspaceMeta(txn, ks) if err != nil { log.Error("[keyspace] failed to save keyspace meta during patrol", - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -770,7 +770,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) if err != nil { log.Error("[keyspace] failed to save default keyspace group meta during patrol", - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index b06921e48db..27e7de359ee 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } // Patrol the keyspace assignment. @@ -430,7 +430,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } @@ -438,7 +438,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -453,14 +453,14 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } - // Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1] + // Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1] // to make sure the range crossing the boundary of etcd transaction operation limit. var ( - startKeyspaceID = uint32(maxEtcdTxnOps / 2) - endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1 + startKeyspaceID = uint32(MaxEtcdTxnOps / 2) + endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1 ) err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID) re.NoError(err) @@ -468,7 +468,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { keyspaceID := uint32(i) if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID { re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index d319798738b..de2baf0648d 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -889,7 +889,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin // - Load and delete the keyspace groups in the merge list. // - Load and update the target keyspace group. // So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction. - if (mergeListNum+1)*2 > maxEtcdTxnOps { + if (mergeListNum+1)*2 > MaxEtcdTxnOps { return ErrExceedMaxEtcdTxnOps } if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) { @@ -1013,6 +1013,80 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { return nil } +// MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group. +func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { + mergedGroupNum := 0 + for i := 0; i < int(endpoint.UserKindCount); i++ { + userKind := endpoint.UserKind(i) + log.Info("start to merge all keyspace groups into the default one", + zap.Stringer("user-kind", userKind)) + groups, ok := m.groups[userKind] + if !ok || groups.Len() == 0 { + continue + } + var ( + maxBatchSize = MaxEtcdTxnOps/2 - 1 + groupsToMerge = make([]uint32, 0, maxBatchSize) + ) + for idx, group := range groups.GetAll() { + if group.ID == utils.DefaultKeyspaceGroupID { + continue + } + groupsToMerge = append(groupsToMerge, group.ID) + if len(groupsToMerge) < maxBatchSize && idx < groups.Len()-1 { + continue + } + log.Info("merge keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merged-group-num", mergedGroupNum)) + // Reach the batch size, merge them into the default keyspace group. + if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil { + log.Error("failed to merge all keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merged-group-num", mergedGroupNum), + zap.Error(err)) + return err + } + // Wait for the merge to finish. + ctx, cancel := context.WithTimeout(m.ctx, time.Minute) + checkLoop: + for { + select { + case <-ctx.Done(): + log.Info("cancel merging all keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merged-group-num", mergedGroupNum)) + cancel() + return nil + case <-time.After(time.Second): + kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + if err != nil { + log.Error("failed to check the default keyspace group merge state", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merged-group-num", mergedGroupNum), + zap.Error(err)) + cancel() + return err + } + if !kg.IsMergeTarget() { + break checkLoop + } + } + } + cancel() + mergedGroupNum += len(groupsToMerge) + groupsToMerge = groupsToMerge[:0] + } + } + log.Info("finish merging all keyspace groups into the default one", + zap.Int("merged-group-num", mergedGroupNum)) + return nil +} + // GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID. func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) { // check if the keyspace group exists diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index e8a40a839c8..5f01146eb96 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -448,7 +448,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error()) // merge with the number of keyspace groups exceeds the limit - err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2)) + err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2)) re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) // merge the default keyspace group err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID}) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 7030c332406..bde700e6ef7 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -276,7 +276,8 @@ func FinishSplitKeyspaceByID(c *gin.Context) { // MergeKeyspaceGroupsParams defines the params for merging the keyspace groups. type MergeKeyspaceGroupsParams struct { - MergeList []uint32 `json:"merge-list"` + MergeList []uint32 `json:"merge-list"` + MergeAllIntoDefault bool `json:"merge-all-into-default"` } // MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group. @@ -292,10 +293,14 @@ func MergeKeyspaceGroups(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) return } - if len(mergeParams.MergeList) == 0 { + if len(mergeParams.MergeList) == 0 && !mergeParams.MergeAllIntoDefault { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list") return } + if len(mergeParams.MergeList) > 0 && mergeParams.MergeAllIntoDefault { + c.AbortWithStatusJSON(http.StatusBadRequest, "non-empty merge list when merge all into default") + return + } for _, mergeID := range mergeParams.MergeList { if !isValid(mergeID) { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") @@ -310,7 +315,11 @@ func MergeKeyspaceGroups(c *gin.Context) { return } // Merge keyspace group. - err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + if mergeParams.MergeAllIntoDefault { + err = groupManager.MergeAllIntoDefaultKeyspaceGroup() + } else { + err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + } if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index ed4a1b964b0..4a8066dcf55 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "math/rand" "strings" "sync" "testing" @@ -28,6 +29,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/keyspace" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -749,3 +751,41 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + + var ( + keyspaceGroupNum = keyspace.MaxEtcdTxnOps + keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum) + ) + for i := 1; i <= keyspaceGroupNum; i++ { + keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ + ID: uint32(i), + UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), + Keyspaces: []uint32{uint32(i)}, + }) + if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: keyspaceGroups, + }) + keyspaceGroups = keyspaceGroups[:0] + } + // Check if all the keyspace groups are created. + groups := handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0") + re.Len(groups, keyspaceGroupNum+1) + // Wait for all the keyspace groups to be served. + svr := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(svr) + svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(keyspaceGroupNum), uint32(keyspaceGroupNum)) + re.NotNil(svr) + // Merge all the keyspace groups into the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeAllIntoDefault: true, + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +} From 8371084c3b91c73c5fbc7840f077af626f118702 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 7 Jul 2023 13:59:45 +0800 Subject: [PATCH 2/6] Refine the test Signed-off-by: JmPotato --- tests/integrations/mcs/tso/keyspace_group_manager_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 4a8066dcf55..c9f25067bb8 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -759,6 +759,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault var ( keyspaceGroupNum = keyspace.MaxEtcdTxnOps keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum) + keyspaces = make([]uint32, 0, keyspaceGroupNum) ) for i := 1; i <= keyspaceGroupNum; i++ { keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ @@ -766,6 +767,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), Keyspaces: []uint32{uint32(i)}, }) + keyspaces = append(keyspaces, uint32(i)) if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { continue } @@ -786,6 +788,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ MergeAllIntoDefault: true, }) + // Wait for all the keyspace groups to be merged. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, keyspaces) + // Check if all the keyspace groups are merged. + groups = handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0") + re.Len(groups, 1) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } From 90f2e8b3df1aca7f3710ef6d0c35fc26892e6d2b Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 7 Jul 2023 14:23:40 +0800 Subject: [PATCH 3/6] Fix TestTSOKeyspaceGroupMembers Signed-off-by: JmPotato --- .../mcs/tso/keyspace_group_manager_test.go | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index c9f25067bb8..5a20211c7e9 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -467,15 +467,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - re.Equal(uint32(0), kg.ID) - re.Equal([]uint32{0}, kg.Keyspaces) - re.False(kg.IsSplitting()) // wait for finishing alloc nodes - testutil.Eventually(re, func() bool { - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - return len(kg.Members) == 2 - }) + waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) testConfig := map[string]string{ "config": "1", "tso_keyspace_group_id": "0", @@ -485,15 +478,19 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { Name: "test_keyspace", Config: testConfig, }) - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - testutil.Eventually(re, func() bool { - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - return len(kg.Members) == 2 - }) + waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } +func waitFinishAllocNodes(re *require.Assertions, server *tests.TestServer, groupID uint32) { + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, groupID) + re.Equal(groupID, kg.ID) + return len(kg.Members) == mcsutils.DefaultKeyspaceGroupReplicaCount + }) +} + func TestTwiceSplitKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) From 31a1f03be39b8135fae17d0a5139c06b64cd25e2 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 7 Jul 2023 14:25:57 +0800 Subject: [PATCH 4/6] Use time ticker Signed-off-by: JmPotato --- pkg/keyspace/tso_keyspace_group.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index de2baf0648d..b522a978098 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -1051,6 +1051,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { } // Wait for the merge to finish. ctx, cancel := context.WithTimeout(m.ctx, time.Minute) + ticker := time.NewTicker(time.Second) checkLoop: for { select { @@ -1060,8 +1061,9 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { zap.Int("batch-size", len(groupsToMerge)), zap.Int("merged-group-num", mergedGroupNum)) cancel() + ticker.Stop() return nil - case <-time.After(time.Second): + case <-ticker.C: kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) if err != nil { log.Error("failed to check the default keyspace group merge state", @@ -1070,6 +1072,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { zap.Int("merged-group-num", mergedGroupNum), zap.Error(err)) cancel() + ticker.Stop() return err } if !kg.IsMergeTarget() { @@ -1078,6 +1081,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { } } cancel() + ticker.Stop() mergedGroupNum += len(groupsToMerge) groupsToMerge = groupsToMerge[:0] } From 418ef5ce092dcc284d89288d6e4282577a1df8e9 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 10 Jul 2023 11:02:21 +0800 Subject: [PATCH 5/6] Add logutil.LogPanic() Signed-off-by: JmPotato --- pkg/keyspace/tso_keyspace_group.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index b522a978098..41fe7285a7f 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -1015,6 +1015,7 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { // MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group. func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { + defer logutil.LogPanic() mergedGroupNum := 0 for i := 0; i < int(endpoint.UserKindCount); i++ { userKind := endpoint.UserKind(i) From e63ebe939b907f8f3c142748400b8b4d08cb19fd Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 10 Jul 2023 14:30:06 +0800 Subject: [PATCH 6/6] Add more info about the merged and unmerged group num Signed-off-by: JmPotato --- pkg/keyspace/tso_keyspace_group.go | 38 +++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 41fe7285a7f..e88055ed86d 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -1016,13 +1016,23 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { // MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group. func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { defer logutil.LogPanic() + // Since we don't take the default keyspace group into account, + // the number of unmerged keyspace groups is -1. + unmergedGroupNum := -1 + // Calculate the total number of keyspace groups to merge. + for _, groups := range m.groups { + unmergedGroupNum += groups.Len() + } mergedGroupNum := 0 - for i := 0; i < int(endpoint.UserKindCount); i++ { - userKind := endpoint.UserKind(i) + // Start to merge all keyspace groups into the default one. + for userKind, groups := range m.groups { + mergeNum := groups.Len() log.Info("start to merge all keyspace groups into the default one", - zap.Stringer("user-kind", userKind)) - groups, ok := m.groups[userKind] - if !ok || groups.Len() == 0 { + zap.Stringer("user-kind", userKind), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) + if mergeNum == 0 { continue } var ( @@ -1034,19 +1044,23 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { continue } groupsToMerge = append(groupsToMerge, group.ID) - if len(groupsToMerge) < maxBatchSize && idx < groups.Len()-1 { + if len(groupsToMerge) < maxBatchSize && idx < mergeNum-1 { continue } log.Info("merge keyspace groups into the default one", zap.Int("index", idx), zap.Int("batch-size", len(groupsToMerge)), - zap.Int("merged-group-num", mergedGroupNum)) + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) // Reach the batch size, merge them into the default keyspace group. if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil { log.Error("failed to merge all keyspace groups into the default one", zap.Int("index", idx), zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum), zap.Error(err)) return err } @@ -1060,7 +1074,9 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { log.Info("cancel merging all keyspace groups into the default one", zap.Int("index", idx), zap.Int("batch-size", len(groupsToMerge)), - zap.Int("merged-group-num", mergedGroupNum)) + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) cancel() ticker.Stop() return nil @@ -1070,7 +1086,9 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { log.Error("failed to check the default keyspace group merge state", zap.Int("index", idx), zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum), zap.Error(err)) cancel() ticker.Stop() @@ -1084,11 +1102,13 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { cancel() ticker.Stop() mergedGroupNum += len(groupsToMerge) + unmergedGroupNum -= len(groupsToMerge) groupsToMerge = groupsToMerge[:0] } } log.Info("finish merging all keyspace groups into the default one", - zap.Int("merged-group-num", mergedGroupNum)) + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) return nil }