Skip to content

Commit

Permalink
Add retry mechanism for updating keyspace group
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 24, 2023
1 parent 7350cd2 commit 8dfe838
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 36 deletions.
15 changes: 13 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsTimeout time.Duration
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -228,6 +231,7 @@ func NewKeyspaceGroupManager(
loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout,
loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize,
loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand Down Expand Up @@ -500,6 +504,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
}
}
}
// Retry the groups that are not initialized successfully before.
for id, group := range kgm.groupUpdateRetryList {
delete(kgm.groupUpdateRetryList, id)
kgm.updateKeyspaceGroup(group)
}
revision = wresp.Header.Revision + 1
}

Expand Down Expand Up @@ -566,13 +575,15 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
if group.IsSplitTarget() {
splitSource := group.SplitSource()
log.Info("keyspace group is in split",
zap.Uint32("keyspace-group-id", group.ID),
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil {
// TODO: guarantee that the split source keyspace group is initialized before.
log.Error("the split source keyspace group is not initialized",
zap.Uint32("target", group.ID),
zap.Uint32("source", splitSource))
// Put the group into the retry list to retry later.
kgm.groupUpdateRetryList[group.ID] = group
return
}
participant.SetCampaignChecker(func(leadership *election.Leadership) bool {
Expand Down
126 changes: 92 additions & 34 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,49 +252,40 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges(
events := []*etcdEvent{}
// Assign keyspace group 0 to this host/pod/keyspace-group-manager.
// final result: [0]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}))
events = append(events, generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}))
// Assign keyspace group 1 to this host/pod/keyspace-group-manager.
// final result: [0,1]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 1, []uint32{1}, []string{"unknown", svcAddr}))
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{"unknown", svcAddr}))
// Assign keyspace group 2 to other host/pod/keyspace-group-manager.
// final result: [0,1]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{"unknown"}))
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{"unknown"}))
// Assign keyspace group 3 to this host/pod/keyspace-group-manager.
// final result: [0,1,3]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{svcAddr}))
events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{svcAddr}))
// Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0.
// final result: [0,1,3]
events = append(events, generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{}))
events = append(events, generateKeyspaceGroupDeleteEvent(0))
// Put keyspace group 4 which doesn't belong to anyone.
// final result: [0,1,3]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{}))
events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{}))
// Put keyspace group 5 which doesn't belong to anyone.
// final result: [0,1,3]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 5, []uint32{5}, []string{}))
events = append(events, generateKeyspaceGroupPutEvent(5, []uint32{5}, []string{}))
// Assign keyspace group 2 to this host/pod/keyspace-group-manager.
// final result: [0,1,2,3]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{svcAddr}))
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}))
// Reassign keyspace group 3 to no one.
// final result: [0,1,2]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{}))
events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{}))
// Reassign keyspace group 4 to this host/pod/keyspace-group-manager.
// final result: [0,1,2,4]
events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{svcAddr}))
events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{svcAddr}))

// Eventually, this keyspace groups manager is expected to serve the following keyspace groups.
expectedGroupIDs := []uint32{0, 1, 2, 4}

// Apply the keyspace group assignment change events to etcd.
for _, event := range events {
switch event.eventType {
case mvccpb.PUT:
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
case mvccpb.DELETE:
err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID)
re.NoError(err)
}
}
suite.applyEtcdEvents(re, rootPath, events)

// Verify the keyspace group assignment.
testutil.Eventually(re, func() bool {
Expand Down Expand Up @@ -332,7 +323,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() {
// Config keyspace group 0 in the storage but assigned to no one.
// final result: []
expectedGroupIDs = []uint32{}
event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{"unknown"})
event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{"unknown"})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
testutil.Eventually(re, func() bool {
Expand All @@ -342,7 +333,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() {
// Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager.
// final result: [0]
expectedGroupIDs = []uint32{0}
event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr})
event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
testutil.Eventually(re, func() bool {
Expand All @@ -352,7 +343,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() {
// Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0.
// final result: [0]
expectedGroupIDs = []uint32{0}
event = generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{})
event = generateKeyspaceGroupDeleteEvent(0)
err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID)
re.NoError(err)
testutil.Eventually(re, func() bool {
Expand All @@ -362,7 +353,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() {
// Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager.
// final result: [0]
expectedGroupIDs = []uint32{0}
event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr})
event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
testutil.Eventually(re, func() bool {
Expand Down Expand Up @@ -460,12 +451,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)

event = generateKeyspaceGroupEvent(
mvccpb.PUT, mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr})
event = generateKeyspaceGroupPutEvent(
mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
event = generateKeyspaceGroupEvent(
mvccpb.PUT, 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr})
event = generateKeyspaceGroupPutEvent(
3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

Expand Down Expand Up @@ -515,24 +506,55 @@ type etcdEvent struct {
ksg *endpoint.KeyspaceGroup
}

func generateKeyspaceGroupEvent(
eventType mvccpb.Event_EventType, groupID uint32, keyspaces []uint32, addrs []string,
func generateKeyspaceGroupPutEvent(
groupID uint32, keyspaces []uint32, addrs []string, splitState ...*endpoint.SplitState,
) *etcdEvent {
members := []endpoint.KeyspaceGroupMember{}
for _, addr := range addrs {
members = append(members, endpoint.KeyspaceGroupMember{Address: addr})
}
var ss *endpoint.SplitState
if len(splitState) > 0 {
ss = splitState[0]
}

return &etcdEvent{
eventType: mvccpb.PUT,
ksg: &endpoint.KeyspaceGroup{
ID: groupID,
Members: members,
Keyspaces: keyspaces,
SplitState: ss,
},
}
}

func generateKeyspaceGroupDeleteEvent(groupID uint32) *etcdEvent {
return &etcdEvent{
eventType: eventType,
eventType: mvccpb.DELETE,
ksg: &endpoint.KeyspaceGroup{
ID: groupID,
Members: members,
Keyspaces: keyspaces,
ID: groupID,
},
}
}

func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents(
re *require.Assertions,
rootPath string,
events []*etcdEvent,
) {
var err error
for _, event := range events {
switch event.eventType {
case mvccpb.PUT:
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
case mvccpb.DELETE:
err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID)
}
re.NoError(err)
}
}

func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
tsoServiceID *discovery.ServiceRegistryEntry,
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string,
Expand Down Expand Up @@ -791,3 +813,39 @@ func verifyGlobalKeyspaceLookupTable(
re.True(ok)
}
}

func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() {
re := suite.Require()

// Start with the empty keyspace group assignment.
mgr := suite.newUniqueKeyspaceGroupManager(0)
re.NotNil(mgr)
defer mgr.Close()
err := mgr.Initialize()
re.NoError(err)

rootPath := mgr.legacySvcRootPath
svcAddr := mgr.tsoServiceID.ServiceAddr

events := []*etcdEvent{}
// Split target keyspace group event arrives first.
events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))
// Split source keyspace group event arrives later.
events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr}, &endpoint.SplitState{
SplitSource: 1,
}))

// Eventually, this keyspace groups manager is expected to serve the following keyspace groups.
expectedGroupIDs := []uint32{0, 1, 2}

// Apply the keyspace group assignment change events to etcd.
suite.applyEtcdEvents(re, rootPath, events)

// Verify the keyspace group assignment.
testutil.Eventually(re, func() bool {
assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr)
return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs)
})
}

0 comments on commit 8dfe838

Please sign in to comment.