From 8dfe8380b429c15aca48386d913a7fd0f20da32d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 24 Apr 2023 11:59:54 +0800 Subject: [PATCH] Add retry mechanism for updating keyspace group Signed-off-by: JmPotato --- pkg/tso/keyspace_group_manager.go | 15 ++- pkg/tso/keyspace_group_manager_test.go | 126 ++++++++++++++++++------- 2 files changed, 105 insertions(+), 36 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index bc329f5552f..9370e8664d1 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -195,6 +195,9 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsTimeout time.Duration loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int + + // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. + groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -228,6 +231,7 @@ func NewKeyspaceGroupManager( loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout, loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize, loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes, + groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -500,6 +504,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( } } } + // Retry the groups that are not initialized successfully before. + for id, group := range kgm.groupUpdateRetryList { + delete(kgm.groupUpdateRetryList, id) + kgm.updateKeyspaceGroup(group) + } revision = wresp.Header.Revision + 1 } @@ -566,13 +575,15 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro if group.IsSplitTarget() { splitSource := group.SplitSource() log.Info("keyspace group is in split", - zap.Uint32("keyspace-group-id", group.ID), + zap.Uint32("target", group.ID), zap.Uint32("source", splitSource)) splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource) if splitSourceAM == nil { - // TODO: guarantee that the split source keyspace group is initialized before. log.Error("the split source keyspace group is not initialized", + zap.Uint32("target", group.ID), zap.Uint32("source", splitSource)) + // Put the group into the retry list to retry later. + kgm.groupUpdateRetryList[group.ID] = group return } participant.SetCampaignChecker(func(leadership *election.Leadership) bool { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index eb16a2f0534..3fa5bbd3fc6 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -252,49 +252,40 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( events := []*etcdEvent{} // Assign keyspace group 0 to this host/pod/keyspace-group-manager. // final result: [0] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr})) // Assign keyspace group 1 to this host/pod/keyspace-group-manager. // final result: [0,1] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 1, []uint32{1}, []string{"unknown", svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{"unknown", svcAddr})) // Assign keyspace group 2 to other host/pod/keyspace-group-manager. // final result: [0,1] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{"unknown"})) + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{"unknown"})) // Assign keyspace group 3 to this host/pod/keyspace-group-manager. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{svcAddr})) // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{})) + events = append(events, generateKeyspaceGroupDeleteEvent(0)) // Put keyspace group 4 which doesn't belong to anyone. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{})) // Put keyspace group 5 which doesn't belong to anyone. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 5, []uint32{5}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(5, []uint32{5}, []string{})) // Assign keyspace group 2 to this host/pod/keyspace-group-manager. // final result: [0,1,2,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr})) // Reassign keyspace group 3 to no one. // final result: [0,1,2] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{})) // Reassign keyspace group 4 to this host/pod/keyspace-group-manager. // final result: [0,1,2,4] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{svcAddr})) // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. expectedGroupIDs := []uint32{0, 1, 2, 4} // Apply the keyspace group assignment change events to etcd. - for _, event := range events { - switch event.eventType { - case mvccpb.PUT: - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) - re.NoError(err) - case mvccpb.DELETE: - err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) - re.NoError(err) - } - } + suite.applyEtcdEvents(re, rootPath, events) // Verify the keyspace group assignment. testutil.Eventually(re, func() bool { @@ -332,7 +323,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage but assigned to no one. // final result: [] expectedGroupIDs = []uint32{} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{"unknown"}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{"unknown"}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -342,7 +333,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -352,7 +343,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{}) + event = generateKeyspaceGroupDeleteEvent(0) err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) re.NoError(err) testutil.Eventually(re, func() bool { @@ -362,7 +353,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -460,12 +451,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) re.NotNil(am) - event = generateKeyspaceGroupEvent( - mvccpb.PUT, mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent( + mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) - event = generateKeyspaceGroupEvent( - mvccpb.PUT, 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent( + 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) @@ -515,24 +506,55 @@ type etcdEvent struct { ksg *endpoint.KeyspaceGroup } -func generateKeyspaceGroupEvent( - eventType mvccpb.Event_EventType, groupID uint32, keyspaces []uint32, addrs []string, +func generateKeyspaceGroupPutEvent( + groupID uint32, keyspaces []uint32, addrs []string, splitState ...*endpoint.SplitState, ) *etcdEvent { members := []endpoint.KeyspaceGroupMember{} for _, addr := range addrs { members = append(members, endpoint.KeyspaceGroupMember{Address: addr}) } + var ss *endpoint.SplitState + if len(splitState) > 0 { + ss = splitState[0] + } + + return &etcdEvent{ + eventType: mvccpb.PUT, + ksg: &endpoint.KeyspaceGroup{ + ID: groupID, + Members: members, + Keyspaces: keyspaces, + SplitState: ss, + }, + } +} +func generateKeyspaceGroupDeleteEvent(groupID uint32) *etcdEvent { return &etcdEvent{ - eventType: eventType, + eventType: mvccpb.DELETE, ksg: &endpoint.KeyspaceGroup{ - ID: groupID, - Members: members, - Keyspaces: keyspaces, + ID: groupID, }, } } +func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( + re *require.Assertions, + rootPath string, + events []*etcdEvent, +) { + var err error + for _, event := range events { + switch event.eventType { + case mvccpb.PUT: + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + case mvccpb.DELETE: + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + } + re.NoError(err) + } +} + func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( tsoServiceID *discovery.ServiceRegistryEntry, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, @@ -791,3 +813,39 @@ func verifyGlobalKeyspaceLookupTable( re.True(ok) } } + +func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { + re := suite.Require() + + // Start with the empty keyspace group assignment. + mgr := suite.newUniqueKeyspaceGroupManager(0) + re.NotNil(mgr) + defer mgr.Close() + err := mgr.Initialize() + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + events := []*etcdEvent{} + // Split target keyspace group event arrives first. + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}, &endpoint.SplitState{ + SplitSource: 1, + })) + // Split source keyspace group event arrives later. + events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr}, &endpoint.SplitState{ + SplitSource: 1, + })) + + // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. + expectedGroupIDs := []uint32{0, 1, 2} + + // Apply the keyspace group assignment change events to etcd. + suite.applyEtcdEvents(re, rootPath, events) + + // Verify the keyspace group assignment. + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) +}