Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, tso: check the replica count before the split #6382

Merged
merged 5 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
Expand Down Expand Up @@ -687,6 +691,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
Expand Down Expand Up @@ -737,6 +744,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
Expand Down
5 changes: 5 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -242,10 +243,14 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas)
// split the keyspace group 2 to 4
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.NoError(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group")
// ErrNoAvailableNode is used to indicate no available node in the keyspace group.
ErrNoAvailableNode = errors.New("no available node")
errModifyDefault = errors.New("cannot modify default keyspace's state")
Expand Down
36 changes: 31 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
log.Info("keyspace group is in split",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if !validateSplit(splitSourceAM, group, splitSourceGroup) {
// Put the group into the retry list to retry later.
kgm.groupUpdateRetryList[group.ID] = group
return
Expand Down Expand Up @@ -616,6 +613,35 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
kgm.Unlock()
}

// validateSplit checks whether the meta info of split keyspace group
// to ensure that the split process could be continued.
func validateSplit(
sourceAM *AllocatorManager,
targetGroup, sourceGroup *endpoint.KeyspaceGroup,
) bool {
splitSourceID := targetGroup.SplitSource()
// Make sure that the split source keyspace group has been initialized.
if sourceAM == nil || sourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID))
return false
}
// Since the target group is derived from the source group and both of them
// could not be modified during the split process, so we can only check the
// member count of the source group here.
memberCount := len(sourceGroup.Members)
if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount {
log.Error("the split source keyspace group does not have enough members",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID),
zap.Int("member-count", memberCount),
zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount))
return false
}
return true
}

// updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
oldGroup, newGroup *endpoint.KeyspaceGroup, updateWithLock bool,
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,11 +875,11 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() {

events := []*etcdEvent{}
// Split target keyspace group event arrives first.
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}, &endpoint.SplitState{
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2} /* Mock 2 replicas */, []string{svcAddr, svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))
// Split source keyspace group event arrives later.
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr}, &endpoint.SplitState{
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr, svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))

Expand Down
1 change: 1 addition & 0 deletions tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() {
ID: uint32(1),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
}}

Expand Down