diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index af269cc95da7..87ce20b8e37f 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -22,16 +22,22 @@ import ( "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/pingcap/kvproto/pkg/tsopb" + "github.com/pingcap/log" tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/unrolled/render" + "go.uber.org/zap" ) -// APIPathPrefix is the prefix of the API path. -const APIPathPrefix = "/tso/api/v1" +const ( + // APIPathPrefix is the prefix of the API path. + APIPathPrefix = "/tso/api/v1" +) var ( once sync.Once @@ -46,14 +52,14 @@ var ( func init() { tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) { s := NewService(srv) - return s.handler(), apiServiceGroup + return s.apiHandlerEngine, apiServiceGroup } } // Service is the tso service. type Service struct { apiHandlerEngine *gin.Engine - baseEndpoint *gin.RouterGroup + root *gin.RouterGroup srv *tsoserver.Service rd *render.Render @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service { apiHandlerEngine.Use(cors.Default()) apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { - c.Set("service", srv) + c.Set(multiservicesapi.ServiceContextKey, srv) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) apiHandlerEngine.GET("metrics", utils.PromHandler()) - endpoint := apiHandlerEngine.Group(APIPathPrefix) + root := apiHandlerEngine.Group(APIPathPrefix) s := &Service{ srv: srv, apiHandlerEngine: apiHandlerEngine, - baseEndpoint: endpoint, + root: root, rd: createIndentRender(), } - s.RegisterRouter() + s.RegisterAdminRouter() + s.RegisterKeyspaceGroupRouter() return s } -// RegisterRouter registers the router of the service. -func (s *Service) RegisterRouter() { +// RegisterAdminRouter registers the router of the TSO admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd) - s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS)) + router.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS)) } -func (s *Service) handler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.apiHandlerEngine.ServeHTTP(w, r) - }) +// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler. +func (s *Service) RegisterKeyspaceGroupRouter() { + router := s.root.Group("keyspace-groups") + router.GET("/members", GetKeyspaceGroupMembers) +} + +// KeyspaceGroupMember contains the keyspace group and its member information. +type KeyspaceGroupMember struct { + Group *endpoint.KeyspaceGroup + Member *tsopb.Participant + IsPrimary bool `json:"is_primary"` + PrimaryID uint64 `json:"primary_id"` +} + +// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving. +func GetKeyspaceGroupMembers(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + kgm := svr.GetKeyspaceGroupManager() + keyspaceGroups := kgm.GetKeyspaceGroups() + members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups)) + for id, group := range keyspaceGroups { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + log.Error("failed to get allocator manager", + zap.Uint32("keyspace-group-id", id), zap.Error(err)) + continue + } + member := am.GetMember() + members[id] = &KeyspaceGroupMember{ + Group: group, + Member: member.GetMember().(*tsopb.Participant), + IsPrimary: member.IsLeader(), + PrimaryID: member.GetLeaderID(), + } + } + c.IndentedJSON(http.StatusOK, members) } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 36207ebf4e0e..801602815596 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -274,6 +274,11 @@ func (s *Server) IsClosed() bool { return atomic.LoadInt64(&s.isRunning) == 0 } +// GetKeyspaceGroupManager returns the manager of keyspace group. +func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager { + return s.keyspaceGroupManager +} + // GetTSOAllocatorManager returns the manager of TSO Allocator. func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) { return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index bc329f5552f2..14347e6214ab 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -195,6 +195,9 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsTimeout time.Duration loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int + + // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. + groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -228,6 +231,7 @@ func NewKeyspaceGroupManager( loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout, loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize, loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes, + groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -500,6 +504,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( } } } + // Retry the groups that are not initialized successfully before. + for id, group := range kgm.groupUpdateRetryList { + delete(kgm.groupUpdateRetryList, id) + kgm.updateKeyspaceGroup(group) + } revision = wresp.Header.Revision + 1 } @@ -566,13 +575,15 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro if group.IsSplitTarget() { splitSource := group.SplitSource() log.Info("keyspace group is in split", - zap.Uint32("keyspace-group-id", group.ID), + zap.Uint32("target", group.ID), zap.Uint32("source", splitSource)) splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource) if splitSourceAM == nil { - // TODO: guarantee that the split source keyspace group is initialized before. log.Error("the split source keyspace group is not initialized", + zap.Uint32("target", group.ID), zap.Uint32("source", splitSource)) + // Put the group into the retry list to retry later. + kgm.groupUpdateRetryList[group.ID] = group return } participant.SetCampaignChecker(func(leadership *election.Leadership) bool { @@ -731,6 +742,20 @@ func (kgm *KeyspaceGroupManager) GetElectionMember( return am.GetMember(), nil } +// GetKeyspaceGroups returns all keyspace groups managed by the current keyspace group manager. +func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.KeyspaceGroup { + kgm.RLock() + defer kgm.RUnlock() + keyspaceGroups := make(map[uint32]*endpoint.KeyspaceGroup) + for _, keyspaceGroupID := range kgm.keyspaceLookupTable { + if _, ok := keyspaceGroups[keyspaceGroupID]; ok { + continue + } + keyspaceGroups[keyspaceGroupID] = kgm.kgs[keyspaceGroupID] + } + return keyspaceGroups +} + // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. func (kgm *KeyspaceGroupManager) HandleTSORequest( keyspaceID, keyspaceGroupID uint32, diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index eb16a2f05348..3fa5bbd3fc65 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -252,49 +252,40 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( events := []*etcdEvent{} // Assign keyspace group 0 to this host/pod/keyspace-group-manager. // final result: [0] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr})) // Assign keyspace group 1 to this host/pod/keyspace-group-manager. // final result: [0,1] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 1, []uint32{1}, []string{"unknown", svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{"unknown", svcAddr})) // Assign keyspace group 2 to other host/pod/keyspace-group-manager. // final result: [0,1] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{"unknown"})) + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{"unknown"})) // Assign keyspace group 3 to this host/pod/keyspace-group-manager. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{svcAddr})) // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{})) + events = append(events, generateKeyspaceGroupDeleteEvent(0)) // Put keyspace group 4 which doesn't belong to anyone. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{})) // Put keyspace group 5 which doesn't belong to anyone. // final result: [0,1,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 5, []uint32{5}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(5, []uint32{5}, []string{})) // Assign keyspace group 2 to this host/pod/keyspace-group-manager. // final result: [0,1,2,3] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr})) // Reassign keyspace group 3 to no one. // final result: [0,1,2] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{})) + events = append(events, generateKeyspaceGroupPutEvent(3, []uint32{3}, []string{})) // Reassign keyspace group 4 to this host/pod/keyspace-group-manager. // final result: [0,1,2,4] - events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{svcAddr})) + events = append(events, generateKeyspaceGroupPutEvent(4, []uint32{4}, []string{svcAddr})) // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. expectedGroupIDs := []uint32{0, 1, 2, 4} // Apply the keyspace group assignment change events to etcd. - for _, event := range events { - switch event.eventType { - case mvccpb.PUT: - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) - re.NoError(err) - case mvccpb.DELETE: - err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) - re.NoError(err) - } - } + suite.applyEtcdEvents(re, rootPath, events) // Verify the keyspace group assignment. testutil.Eventually(re, func() bool { @@ -332,7 +323,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage but assigned to no one. // final result: [] expectedGroupIDs = []uint32{} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{"unknown"}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{"unknown"}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -342,7 +333,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -352,7 +343,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{}) + event = generateKeyspaceGroupDeleteEvent(0) err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) re.NoError(err) testutil.Eventually(re, func() bool { @@ -362,7 +353,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. // final result: [0] expectedGroupIDs = []uint32{0} - event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { @@ -460,12 +451,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) re.NotNil(am) - event = generateKeyspaceGroupEvent( - mvccpb.PUT, mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent( + mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) - event = generateKeyspaceGroupEvent( - mvccpb.PUT, 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) + event = generateKeyspaceGroupPutEvent( + 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) @@ -515,24 +506,55 @@ type etcdEvent struct { ksg *endpoint.KeyspaceGroup } -func generateKeyspaceGroupEvent( - eventType mvccpb.Event_EventType, groupID uint32, keyspaces []uint32, addrs []string, +func generateKeyspaceGroupPutEvent( + groupID uint32, keyspaces []uint32, addrs []string, splitState ...*endpoint.SplitState, ) *etcdEvent { members := []endpoint.KeyspaceGroupMember{} for _, addr := range addrs { members = append(members, endpoint.KeyspaceGroupMember{Address: addr}) } + var ss *endpoint.SplitState + if len(splitState) > 0 { + ss = splitState[0] + } + + return &etcdEvent{ + eventType: mvccpb.PUT, + ksg: &endpoint.KeyspaceGroup{ + ID: groupID, + Members: members, + Keyspaces: keyspaces, + SplitState: ss, + }, + } +} +func generateKeyspaceGroupDeleteEvent(groupID uint32) *etcdEvent { return &etcdEvent{ - eventType: eventType, + eventType: mvccpb.DELETE, ksg: &endpoint.KeyspaceGroup{ - ID: groupID, - Members: members, - Keyspaces: keyspaces, + ID: groupID, }, } } +func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( + re *require.Assertions, + rootPath string, + events []*etcdEvent, +) { + var err error + for _, event := range events { + switch event.eventType { + case mvccpb.PUT: + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + case mvccpb.DELETE: + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + } + re.NoError(err) + } +} + func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( tsoServiceID *discovery.ServiceRegistryEntry, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, @@ -791,3 +813,39 @@ func verifyGlobalKeyspaceLookupTable( re.True(ok) } } + +func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { + re := suite.Require() + + // Start with the empty keyspace group assignment. + mgr := suite.newUniqueKeyspaceGroupManager(0) + re.NotNil(mgr) + defer mgr.Close() + err := mgr.Initialize() + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + events := []*etcdEvent{} + // Split target keyspace group event arrives first. + events = append(events, generateKeyspaceGroupPutEvent(2, []uint32{2}, []string{svcAddr}, &endpoint.SplitState{ + SplitSource: 1, + })) + // Split source keyspace group event arrives later. + events = append(events, generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr}, &endpoint.SplitState{ + SplitSource: 1, + })) + + // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. + expectedGroupIDs := []uint32{0, 1, 2} + + // Apply the keyspace group assignment change events to etcd. + suite.applyEtcdEvents(re, rootPath, events) + + // Verify the keyspace group assignment. + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) +} diff --git a/pkg/utils/apiutil/multiservicesapi/middleware.go b/pkg/utils/apiutil/multiservicesapi/middleware.go index 4f889b52573a..ed34ecc6afb3 100644 --- a/pkg/utils/apiutil/multiservicesapi/middleware.go +++ b/pkg/utils/apiutil/multiservicesapi/middleware.go @@ -26,16 +26,19 @@ import ( "go.uber.org/zap" ) -// HTTP headers. const ( + // ServiceAllowDirectHandle is the header key to allow direct handle. ServiceAllowDirectHandle = "service-allow-direct-handle" - ServiceRedirectorHeader = "service-redirector" + // ServiceRedirectorHeader is the header key to indicate the request is redirected. + ServiceRedirectorHeader = "service-redirector" + // ServiceContextKey is the key to get service server from gin.Context. + ServiceContextKey = "service" ) // ServiceRedirector is a middleware to redirect the request to the right place. func ServiceRedirector() gin.HandlerFunc { return func(c *gin.Context) { - svr := c.MustGet("service").(bs.Server) + svr := c.MustGet(ServiceContextKey).(bs.Server) allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0 isServing := svr.IsServing() if allowDirectHandle || isServing { diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go new file mode 100644 index 000000000000..e5dbcd7998c5 --- /dev/null +++ b/tests/integrations/mcs/tso/api_test.go @@ -0,0 +1,106 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" +) + +const ( + tsoKeyspaceGroupsPrefix = "/tso/api/v1/keyspace-groups" +) + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +type tsoAPITestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + pdCluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster +} + +func TestTSOAPI(t *testing.T) { + suite.Run(t, new(tsoAPITestSuite)) +} + +func (suite *tsoAPITestSuite) SetupTest() { + re := suite.Require() + + var err error + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.pdCluster, err = tests.NewTestAPICluster(suite.ctx, 1) + re.NoError(err) + err = suite.pdCluster.RunInitialServers() + re.NoError(err) + leaderName := suite.pdCluster.WaitLeader() + pdLeaderServer := suite.pdCluster.GetServer(leaderName) + re.NoError(pdLeaderServer.BootstrapCluster()) + suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr()) + re.NoError(err) +} + +func (suite *tsoAPITestSuite) TearDownTest() { + suite.cancel() + suite.tsoCluster.Destroy() + suite.pdCluster.Destroy() +} + +func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() { + re := suite.Require() + + primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(primary) + members := mustGetKeyspaceGroupMembers(re, primary) + re.Len(members, 1) + defaultGroupMember := members[mcsutils.DefaultKeyspaceGroupID] + re.NotNil(defaultGroupMember) + re.Equal(mcsutils.DefaultKeyspaceGroupID, defaultGroupMember.Group.ID) + re.True(defaultGroupMember.IsPrimary) + primaryMember, err := primary.GetMember(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID) +} + +func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember { + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil) + re.NoError(err) + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + re.Equal(http.StatusOK, httpResp.StatusCode, string(data)) + var resp map[uint32]*apis.KeyspaceGroupMember + re.NoError(json.Unmarshal(data, &resp)) + return resp +}