Skip to content

Commit

Permalink
keyspace, tso: check the replica count before the split (#6382)
Browse files Browse the repository at this point in the history
ref #6233

Check the replica count before the split.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 26, 2023
1 parent cb61bb2 commit d8a3541
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 7 deletions.
10 changes: 10 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
Expand Down Expand Up @@ -687,6 +691,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
Expand Down Expand Up @@ -737,6 +744,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
Expand Down
5 changes: 5 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -242,10 +243,14 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas)
// split the keyspace group 2 to 4
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.NoError(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group")
// ErrNoAvailableNode is used to indicate no available node in the keyspace group.
ErrNoAvailableNode = errors.New("no available node")
errModifyDefault = errors.New("cannot modify default keyspace's state")
Expand Down
36 changes: 31 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
log.Info("keyspace group is in split",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if !validateSplit(splitSourceAM, group, splitSourceGroup) {
// Put the group into the retry list to retry later.
kgm.groupUpdateRetryList[group.ID] = group
return
Expand Down Expand Up @@ -616,6 +613,35 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
kgm.Unlock()
}

// validateSplit checks whether the meta info of split keyspace group
// to ensure that the split process could be continued.
func validateSplit(
sourceAM *AllocatorManager,
targetGroup, sourceGroup *endpoint.KeyspaceGroup,
) bool {
splitSourceID := targetGroup.SplitSource()
// Make sure that the split source keyspace group has been initialized.
if sourceAM == nil || sourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID))
return false
}
// Since the target group is derived from the source group and both of them
// could not be modified during the split process, so we can only check the
// member count of the source group here.
memberCount := len(sourceGroup.Members)
if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount {
log.Error("the split source keyspace group does not have enough members",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID),
zap.Int("member-count", memberCount),
zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount))
return false
}
return true
}

// updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
oldGroup, newGroup *endpoint.KeyspaceGroup, updateWithLock bool,
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,11 +875,11 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() {

events := []*etcdEvent{}
// Split target keyspace group event arrives first.
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}, &endpoint.SplitState{
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2} /* Mock 2 replicas */, []string{svcAddr, svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))
// Split source keyspace group event arrives later.
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr}, &endpoint.SplitState{
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr, svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))

Expand Down
1 change: 1 addition & 0 deletions tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() {
ID: uint32(1),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
}}

Expand Down

0 comments on commit d8a3541

Please sign in to comment.