Skip to content

Commit

Permalink
Refine the split and merge details
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jun 28, 2023
1 parent 6f0c63b commit 8093b64
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 31 deletions.
8 changes: 4 additions & 4 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// Changing the state of default keyspace is not allowed.
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace
}
var meta *keyspacepb.KeyspaceMeta
err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
Expand Down Expand Up @@ -567,9 +567,9 @@ func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.K
// Changing the state of default keyspace is not allowed.
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace
}
var meta *keyspacepb.KeyspaceMeta
var err error
Expand Down
65 changes: 40 additions & 25 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,16 @@ func (m *GroupManager) SplitKeyspaceGroupByID(
if splitSourceKg.IsMerging() {
return ErrKeyspaceGroupInMerging(splitSourceID)
}
// Build the new keyspace groups for split source and target.
var startKeyspaceID, endKeyspaceID uint32
if len(keyspaceIDRange) >= 2 {
startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1]
}
splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces(
splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID)
if err != nil {
return err
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
Expand All @@ -548,15 +558,6 @@ func (m *GroupManager) SplitKeyspaceGroupByID(
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
var startKeyspaceID, endKeyspaceID uint32
if len(keyspaceIDRange) >= 2 {
startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1]
}
splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces(
splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID)
if err != nil {
return err
}
// Update the old keyspace group.
splitSourceKg.Keyspaces = splitSourceKeyspaces
splitSourceKg.SplitState = &endpoint.SplitState{
Expand Down Expand Up @@ -606,6 +607,9 @@ func buildSplitKeyspaces(
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range new {
if keyspace == utils.DefaultKeyspaceID {
return nil, nil, ErrModifyDefaultKeyspace
}
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
Expand All @@ -618,15 +622,7 @@ func buildSplitKeyspaces(
oldSplit = append(oldSplit, keyspace)
}
}
// Dedup new keyspaces if it's necessary.
if newNum == len(newKeyspaceMap) {
return oldSplit, new, nil
}
newSplit := make([]uint32, 0, len(newKeyspaceMap))
for keyspace := range newKeyspaceMap {
newSplit = append(newSplit, keyspace)
}
return oldSplit, newSplit, nil
return oldSplit, new, nil
}
// Split according to the start and end keyspace ID.
if startKeyspaceID == 0 && endKeyspaceID == 0 {
Expand All @@ -637,6 +633,9 @@ func buildSplitKeyspaces(
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
if keyspace == utils.DefaultKeyspaceID {
return nil, nil, ErrModifyDefaultKeyspace
}
if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID {
newSplit = append(newSplit, keyspace)
newKeyspaceMap[keyspace] = struct{}{}
Expand Down Expand Up @@ -770,7 +769,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
return nil, err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
log.Info("alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", id), zap.Reflect("nodes", nodes))
log.Info("alloc nodes for keyspace group",
zap.Uint32("keyspace-group-id", id),
zap.Reflect("nodes", nodes))
return nodes, nil
}

Expand Down Expand Up @@ -906,20 +907,17 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
}
groups[kgID] = kg
}
// Build the new keyspaces for the merge target keyspace group.
mergeTargetKg = groups[mergeTargetID]
keyspaces := make(map[uint32]struct{})
for _, keyspace := range mergeTargetKg.Keyspaces {
keyspaces[keyspace] = struct{}{}
}
// Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group.
for _, kgID := range mergeList {
kg := groups[kgID]
for _, keyspace := range kg.Keyspaces {
keyspaces[keyspace] = struct{}{}
}
if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil {
return err
}
}
mergedKeyspaces := make([]uint32, 0, len(keyspaces))
for keyspace := range keyspaces {
Expand All @@ -933,7 +931,17 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
mergeTargetKg.MergeState = &endpoint.MergeState{
MergeList: mergeList,
}
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
err = m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
if err != nil {
return err
}
// Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group.
for _, kgID := range mergeList {
if err := m.store.DeleteKeyspaceGroup(txn, kgID); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
Expand All @@ -948,7 +956,10 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin

// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID.
func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
var mergeTargetKg *endpoint.KeyspaceGroup
var (
mergeTargetKg *endpoint.KeyspaceGroup
mergeList []uint32
)
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
Expand All @@ -974,12 +985,16 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
return ErrKeyspaceGroupNotInMerging(kgID)
}
}
mergeList = mergeTargetKg.MergeState.MergeList
mergeTargetKg.MergeState = nil
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
log.Info("finish merge keyspace group",
zap.Uint32("merge-target-id", mergeTargetKg.ID),
zap.Reflect("merge-list", mergeList))
return nil
}
3 changes: 3 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the default keyspace
err = suite.kgm.SplitKeyspaceGroupByID(0, 4, []uint32{utils.DefaultKeyspaceID})
re.ErrorIs(err, ErrModifyDefaultKeyspace)
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas)
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ var (
ErrNoAvailableNode = errors.New("no available node")
// ErrExceedMaxEtcdTxnOps is used to indicate the number of etcd txn operations exceeds the limit.
ErrExceedMaxEtcdTxnOps = errors.New("exceed max etcd txn operations")
errModifyDefault = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")
// ErrModifyDefaultKeyspace is used to indicate that default keyspace cannot be modified.
ErrModifyDefaultKeyspace = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")

// stateTransitionTable lists all allowed next state for the given current state.
// Note that transit from any state to itself is allowed for idempotence.
Expand Down

0 comments on commit 8093b64

Please sign in to comment.