diff --git a/errors.toml b/errors.toml index b6f6621957f..85178c926ec 100644 --- a/errors.toml +++ b/errors.toml @@ -481,6 +481,11 @@ error = ''' init file log error, %s ''' +["PD:member:ErrCheckCampaign"] +error = ''' +check campaign failed +''' + ["PD:member:ErrEtcdLeaderNotFound"] error = ''' etcd leader not found @@ -491,11 +496,6 @@ error = ''' marshal leader failed ''' -["PD:member:ErrPreCheckCampaign"] -error = ''' -pre-check campaign failed -''' - ["PD:netstat:ErrNetstatTCPSocks"] error = ''' TCP socks error diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index eb22d7584b2..fc55af7a23a 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -60,7 +60,7 @@ var ( var ( ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound")) ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader")) - ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign")) + ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign")) ) // core errors diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 54e6266df5d..67498409371 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -235,6 +235,15 @@ func (s *Server) GetLeaderListenUrls() []string { return member.GetLeaderListenUrls() } +// GetMember returns the election member of the given keyspace and keyspace group. +func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMember, error) { + member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID) + if err != nil { + return nil, err + } + return member, nil +} + // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 8ed4d8bdb28..102bfcbce5f 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -48,9 +48,9 @@ type Participant struct { // leader key when this participant is successfully elected as the leader of // the group. Every write will use it to check the leadership. memberValue string - // preCampaignChecker is called before the campaign. If it returns false, the - // campaign will be skipped. - preCampaignChecker leadershipCheckFunc + // campaignChecker is used to check whether the additional constraints for a + // campaign are satisfied. If it returns false, the campaign will fail. + campaignChecker atomic.Value // Store as leadershipCheckFunc // lastLeaderUpdatedTime is the last time when the leader is updated. lastLeaderUpdatedTime atomic.Value } @@ -112,7 +112,7 @@ func (m *Participant) Client() *clientv3.Client { // IsLeader returns whether the participant is the leader or not by checking its leadership's // lease and leader info. func (m *Participant) IsLeader() bool { - return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() + return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck() } // IsLeaderElected returns true if the leader exists; otherwise false @@ -181,8 +181,8 @@ func (m *Participant) GetLeadership() *election.Leadership { // CampaignLeader is used to campaign the leadership and make it become a leader. func (m *Participant) CampaignLeader(leaseTimeout int64) error { - if m.preCampaignChecker != nil && !m.preCampaignChecker(m.leadership) { - return errs.ErrPreCheckCampaign + if !m.campaignCheck() { + return errs.ErrCheckCampaign } return m.leadership.Campaign(leaseTimeout, m.MemberValue()) } @@ -351,7 +351,19 @@ func (m *Participant) GetLeaderPriority(id uint64) (int, error) { return int(priority), nil } -// SetPreCampaignChecker sets the pre-campaign checker. -func (m *Participant) SetPreCampaignChecker(checker leadershipCheckFunc) { - m.preCampaignChecker = checker +func (m *Participant) campaignCheck() bool { + checker := m.campaignChecker.Load() + if checker == nil { + return true + } + checkerFunc, ok := checker.(leadershipCheckFunc) + if !ok || checkerFunc == nil { + return true + } + return checkerFunc(m.leadership) +} + +// SetCampaignChecker sets the pre-campaign checker. +func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { + m.campaignChecker.Store(checker) } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 4aa1b02bc4e..68a034e7b19 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -296,7 +296,8 @@ func (am *AllocatorManager) close() { log.Info("closed the allocator manager") } -func (am *AllocatorManager) getMember() ElectionMember { +// GetMember returns the ElectionMember of this AllocatorManager. +func (am *AllocatorManager) GetMember() ElectionMember { return am.member } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index c7c9460a8c1..18608c1d5d7 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -507,7 +507,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", zap.String("campaign-tso-primary-name", gta.member.Name())) - } else if errors.Is(err, errs.ErrPreCheckCampaign) { + } else if errors.Is(err, errs.ErrCheckCampaign) { log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", zap.String("campaign-tso-primary-name", gta.member.Name())) } else { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index febbbdabe41..49529998ba9 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -575,8 +575,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro zap.Uint32("source", splitSource)) return } - participant.SetPreCampaignChecker(func(leadership *election.Leadership) bool { - return splitSourceAM.getMember().IsLeader() + participant.SetCampaignChecker(func(leadership *election.Leadership) bool { + return splitSourceAM.GetMember().IsLeader() }) } // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. @@ -673,7 +673,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( } // Check if the split is completed. if oldGroup.IsSplitTarget() && !newGroup.IsSplitting() { - kgm.ams[groupID].getMember().(*member.Participant).SetPreCampaignChecker(nil) + kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil) } kgm.kgs[groupID] = newGroup } @@ -728,7 +728,7 @@ func (kgm *KeyspaceGroupManager) GetElectionMember( if err != nil { return nil, err } - return am.getMember(), nil + return am.GetMember(), nil } // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index fd44fd4d439..baad22c9148 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -21,12 +21,13 @@ import ( "github.com/stretchr/testify/require" tso "github.com/tikv/pd/pkg/mcs/tso/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" ) -// TestCluster is a test cluster for TSO. -type TestCluster struct { +// TestTSOCluster is a test cluster for TSO. +type TestTSOCluster struct { ctx context.Context backendEndpoints string @@ -35,8 +36,8 @@ type TestCluster struct { } // NewTestTSOCluster creates a new TSO test cluster. -func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) { - tc = &TestCluster{ +func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestTSOCluster, err error) { + tc = &TestTSOCluster{ ctx: ctx, backendEndpoints: backendEndpoints, servers: make(map[string]*tso.Server, initialServerCount), @@ -52,7 +53,7 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo } // AddServer adds a new TSO server to the test cluster. -func (tc *TestCluster) AddServer(addr string) error { +func (tc *TestTSOCluster) AddServer(addr string) error { cfg := tso.NewConfig() cfg.BackendEndpoints = tc.backendEndpoints cfg.ListenAddr = addr @@ -75,7 +76,7 @@ func (tc *TestCluster) AddServer(addr string) error { } // Destroy stops and destroy the test cluster. -func (tc *TestCluster) Destroy() { +func (tc *TestTSOCluster) Destroy() { for _, cleanup := range tc.cleanupFuncs { cleanup() } @@ -84,14 +85,14 @@ func (tc *TestCluster) Destroy() { } // DestroyServer stops and destroy the test server by the given address. -func (tc *TestCluster) DestroyServer(addr string) { +func (tc *TestTSOCluster) DestroyServer(addr string) { tc.cleanupFuncs[addr]() delete(tc.cleanupFuncs, addr) delete(tc.servers, addr) } // GetPrimary returns the primary TSO server. -func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { +func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { return server @@ -101,12 +102,12 @@ func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Serve } // WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace. -func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string { - var primary string +func (tc *TestTSOCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) *tso.Server { + var primary *tso.Server testutil.Eventually(re, func() bool { - for name, s := range tc.servers { - if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { - primary = name + for _, server := range tc.servers { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { + primary = server return true } } @@ -117,12 +118,12 @@ func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, } // WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace. -func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string { +func (tc *TestTSOCluster) WaitForDefaultPrimaryServing(re *require.Assertions) *tso.Server { return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) } // GetServer returns the TSO server by the given address. -func (tc *TestCluster) GetServer(addr string) *tso.Server { +func (tc *TestTSOCluster) GetServer(addr string) *tso.Server { for srvAddr, server := range tc.servers { if srvAddr == addr { return server @@ -132,6 +133,16 @@ func (tc *TestCluster) GetServer(addr string) *tso.Server { } // GetServers returns all TSO servers. -func (tc *TestCluster) GetServers() map[string]*tso.Server { +func (tc *TestTSOCluster) GetServers() map[string]*tso.Server { return tc.servers } + +// GetKeyspaceGroupMember converts the TSO servers to KeyspaceGroupMember and returns. +func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) { + for _, server := range tc.servers { + members = append(members, endpoint.KeyspaceGroupMember{ + Address: server.GetAddr(), + }) + } + return +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 159887c1cd2..81b85d314f9 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,24 +16,24 @@ package tso import ( "context" + "sync" "testing" "time" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/tsopb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" - tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" - "google.golang.org/grpc" ) type tsoKeyspaceGroupManagerTestSuite struct { @@ -46,12 +46,8 @@ type tsoKeyspaceGroupManagerTestSuite struct { cluster *tests.TestCluster // pdLeaderServer is the leader server of the PD cluster. pdLeaderServer *tests.TestServer - // tsoServer is the TSO service provider. - tsoServer *tso.Server - tsoServerCleanup func() - tsoClientConn *grpc.ClientConn - - tsoClient tsopb.TSOClient + // tsoCluster is the TSO service cluster. + tsoCluster *mcs.TestTSOCluster } func TestTSOKeyspaceGroupManager(t *testing.T) { @@ -70,29 +66,35 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) re.NoError(suite.pdLeaderServer.BootstrapCluster()) - backendEndpoints := suite.pdLeaderServer.GetAddr() - suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) - suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) + suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) + re.NoError(err) } func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { suite.cancel() - suite.tsoClientConn.Close() - suite.tsoServerCleanup() + suite.tsoCluster.Destroy() suite.cluster.Destroy() } +func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { + cleanupKeyspaceGroups(suite.Require(), suite.pdLeaderServer) +} + +func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) { + for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") { + handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID) + } +} + func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re := suite.Require() - ctx, cancel := context.WithCancel(suite.ctx) - defer cancel() // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { ID: 1, UserKind: endpoint.Standard.String(), - Members: []endpoint.KeyspaceGroupMember{{Address: suite.tsoServer.GetAddr()}}, + Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, @@ -102,15 +104,17 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) // Get a TSO from the keyspace group 1. - var ts *pdpb.Timestamp + var ( + ts pdpb.Timestamp + err error + ) testutil.Eventually(re, func() bool { - resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 1) - ts = resp.GetTimestamp() - return err == nil && tsoutil.CompareTimestamp(ts, &pdpb.Timestamp{}) > 0 + ts, err = suite.requestTSO(re, 1, 222, 1) + return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() // Set the TSO of the keyspace group 1 to a large value. - err := suite.tsoServer.GetHandler().ResetTS(tsoutil.GenerateTS(ts), false, true, 1) + err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) re.NoError(err) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ @@ -122,32 +126,143 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re.Equal([]uint32{222, 333}, kg2.Keyspaces) re.True(kg2.IsSplitTarget()) // Check the split TSO from keyspace group 2. - var splitTS *pdpb.Timestamp + var splitTS pdpb.Timestamp testutil.Eventually(re, func() bool { - resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 2) - splitTS = resp.GetTimestamp() - return err == nil && tsoutil.CompareTimestamp(splitTS, &pdpb.Timestamp{}) > 0 + splitTS, err = suite.requestTSO(re, 1, 222, 2) + return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) - re.Greater(tsoutil.CompareTimestamp(splitTS, ts), 0) + re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) + // Finish the split. + handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) } -func request( +func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( re *require.Assertions, - ctx context.Context, client tsopb.TSOClient, count uint32, - clusterID uint64, keyspaceID, keyspaceGroupID uint32, -) (ts *tsopb.TsoResponse, err error) { - req := &tsopb.TsoRequest{ - Header: &tsopb.RequestHeader{ - ClusterId: clusterID, - KeyspaceId: keyspaceID, - KeyspaceGroupId: keyspaceGroupID, + count, keyspaceID, keyspaceGroupID uint32, +) (pdpb.Timestamp, error) { + primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) + tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID) + re.NoError(err) + re.NotNil(tam) + return tam.HandleRequest(tsopkg.GlobalDCLocation, count) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { + re := suite.Require() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, }, - DcLocation: tsopkg.GlobalDCLocation, - Count: count, - } - tsoClient, err := client.Tso(ctx) + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + // Split the keyspace group 1 to 2. + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: 2, + Keyspaces: []uint32{222, 333}, + }) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{222, 333}, kg2.Keyspaces) + re.True(kg2.IsSplitTarget()) + // Check the leadership. + member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1) + re.NoError(err) + re.NotNil(member1) + member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2) re.NoError(err) - defer tsoClient.CloseSend() - re.NoError(tsoClient.Send(req)) - return tsoClient.Recv() + re.NotNil(member2) + // Wait for the leader of the keyspace group 1 and 2 to be elected. + testutil.Eventually(re, func() bool { + return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 + }) + // Check if the leader of the keyspace group 1 and 2 are the same. + re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) + // Resign and block the leader of the keyspace group 1 from being elected. + member1.(*member.Participant).SetCampaignChecker(func(*election.Leadership) bool { + return false + }) + member1.ResetLeader() + // The leader of the keyspace group 2 should be resigned also. + testutil.Eventually(re, func() bool { + return member2.IsLeader() == false + }) + // Check if the leader of the keyspace group 1 and 2 are the same again. + member1.(*member.Participant).SetCampaignChecker(nil) + testutil.Eventually(re, func() bool { + return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 + }) + re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) + // Finish the split. + handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() { + // TODO: remove the skip after the client is able to support multi-keyspace-group. + suite.T().SkipNow() + + re := suite.Require() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + // Prepare the client for keyspace 222. + var tsoClient pd.TSOClient + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(tsoClient) + // Request the TSO for keyspace 222 concurrently. + var ( + wg sync.WaitGroup + ctx, cancel = context.WithCancel(suite.ctx) + lastPhysical, lastLogical int64 + ) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + physical, logical, err := tsoClient.GetTS(ctx) + re.NoError(err) + re.Greater(physical, lastPhysical) + re.Greater(logical, lastLogical) + lastPhysical, lastLogical = physical, logical + } + }() + // Split the keyspace group 1 to 2. + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: 2, + Keyspaces: []uint32{222, 333}, + }) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{222, 333}, kg2.Keyspaces) + re.True(kg2.IsSplitTarget()) + // Finish the split. + handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) + cancel() + wg.Wait() } diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index 53e1cb945db..3ab41bfb0cc 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -131,7 +131,8 @@ func mustLoadKeyspaces(re *require.Assertions, server *tests.TestServer, name st return meta.KeyspaceMeta } -func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServer, token, limit string) []*endpoint.KeyspaceGroup { +// MustLoadKeyspaceGroups loads all keyspace groups from the server. +func MustLoadKeyspaceGroups(re *require.Assertions, server *tests.TestServer, token, limit string) []*endpoint.KeyspaceGroup { // Construct load range request. httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix, nil) re.NoError(err) @@ -143,16 +144,15 @@ func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServ httpResp, err := dialClient.Do(httpReq) re.NoError(err) defer httpResp.Body.Close() - re.Equal(http.StatusOK, httpResp.StatusCode) - // Receive & decode response. data, err := io.ReadAll(httpResp.Body) re.NoError(err) + re.Equal(http.StatusOK, httpResp.StatusCode, string(data)) var resp []*endpoint.KeyspaceGroup re.NoError(json.Unmarshal(data, &resp)) return resp } -func tryCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) int { +func tryCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) (int, string) { data, err := json.Marshal(request) re.NoError(err) httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) @@ -160,7 +160,9 @@ func tryCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, re resp, err := dialClient.Do(httpReq) re.NoError(err) defer resp.Body.Close() - return resp.StatusCode + data, err = io.ReadAll(resp.Body) + re.NoError(err) + return resp.StatusCode, string(data) } // MustLoadKeyspaceGroupByID loads the keyspace group by ID with HTTP API. @@ -170,9 +172,9 @@ func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, resp, err := dialClient.Do(httpReq) re.NoError(err) defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) data, err := io.ReadAll(resp.Body) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) var kg endpoint.KeyspaceGroup re.NoError(json.Unmarshal(data, &kg)) return &kg @@ -180,14 +182,26 @@ func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, // MustCreateKeyspaceGroup creates a keyspace group with HTTP API. func MustCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) { - code := tryCreateKeyspaceGroup(re, server, request) - re.Equal(http.StatusOK, code) + code, data := tryCreateKeyspaceGroup(re, server, request) + re.Equal(http.StatusOK, code, data) } // FailCreateKeyspaceGroupWithCode fails to create a keyspace group with HTTP API. func FailCreateKeyspaceGroupWithCode(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams, expect int) { - code := tryCreateKeyspaceGroup(re, server, request) - re.Equal(expect, code) + code, data := tryCreateKeyspaceGroup(re, server, request) + re.Equal(expect, code, data) +} + +// MustDeleteKeyspaceGroup deletes a keyspace group with HTTP API. +func MustDeleteKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32) { + httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d", id), nil) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) } // MustSplitKeyspaceGroup updates a keyspace group with HTTP API. @@ -200,7 +214,9 @@ func MustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id resp, err := dialClient.Do(httpReq) re.NoError(err) defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) } // MustFinishSplitKeyspaceGroup finishes a keyspace group split with HTTP API. @@ -211,5 +227,7 @@ func MustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServ resp, err := dialClient.Do(httpReq) re.NoError(err) defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) + data, err := io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) } diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 4a79eff7fe6..a0408711e32 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -126,7 +126,7 @@ func (suite *keyspaceGroupTestSuite) TestLoadKeyspaceGroup() { }} MustCreateKeyspaceGroup(re, suite.server, kgs) - resp := sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") + resp := MustLoadKeyspaceGroups(re, suite.server, "0", "0") re.Len(resp, 3) } @@ -141,13 +141,13 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() { }} MustCreateKeyspaceGroup(re, suite.server, kgs) - resp := sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") + resp := MustLoadKeyspaceGroups(re, suite.server, "0", "0") re.Len(resp, 2) MustSplitKeyspaceGroup(re, suite.server, 1, &handlers.SplitKeyspaceGroupByIDParams{ NewID: uint32(2), Keyspaces: []uint32{111, 222}, }) - resp = sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") + resp = MustLoadKeyspaceGroups(re, suite.server, "0", "0") re.Len(resp, 3) // Check keyspace group 1. kg1 := MustLoadKeyspaceGroupByID(re, suite.server, 1)