diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d9866b7a9db..25a832c5f48 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb return member, nil } -// ResignPrimary resigns the primary of the given keyspace and keyspace group. -func (s *Server) ResignPrimary() error { - member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +// ResignPrimary resigns the primary of the given keyspace. +func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID) if err != nil { return err } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c9e8dac4ca..0089e0d9bdc 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID( return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil } -// GetElectionMember returns the election member of the given keyspace group +// GetElectionMember returns the election member of the keyspace group serving the given keyspace. func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 228f506454d..dbc9964b62b 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -16,6 +16,7 @@ package mcs import ( "context" + "fmt" "time" "github.com/stretchr/testify/require" @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) { } // ResignPrimary resigns the primary TSO server. -func (tc *TestTSOCluster) ResignPrimary() { - tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary() +func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID) + if primaryServer == nil { + return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID) + } + return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID) } -// GetPrimary returns the primary TSO server. -func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { +// GetPrimaryServer returns the primary TSO server of the given keyspace +func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { return server diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index f9c47aa56f0..3ca1ad39436 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -29,6 +29,7 @@ import ( tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" ) var once sync.Once @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) { return err } -// SetupClientWithKeyspace creates a TSO client for test. -func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test. +func SetupClientWithDefaultKeyspaceName( + ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption, +) pd.Client { cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } +// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test. +func SetupClientWithKeyspaceID( + ctx context.Context, re *require.Assertions, + keyspaceID uint32, endpoints []string, opts ...pd.ClientOption, +) pd.Client { + cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...) + re.NoError(err) + return cli +} + // StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { cfg := rm.NewConfig() @@ -137,3 +150,73 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { } return errors.WithStack(err) } + +// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. +func CheckMultiKeyspacesTSO( + ctx context.Context, re *require.Assertions, + clients []pd.Client, parallelAct func(), +) { + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + wg.Add(len(clients)) + + for _, client := range clients { + go func(cli pd.Client) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + physical, logical, err := cli.GetTS(ctx) + // omit the error check since there are many kinds of errors + if err != nil { + continue + } + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(client) + } + + wg.Add(1) + go func() { + defer wg.Done() + parallelAct() + cancel() + }() + + wg.Wait() +} + +// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side +func WaitForMultiKeyspacesTSOAvailable( + ctx context.Context, re *require.Assertions, + keyspaceIDs []uint32, backendEndpoints []string, +) []pd.Client { + wg := sync.WaitGroup{} + wg.Add(len(keyspaceIDs)) + + clients := make([]pd.Client, 0, len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints) + re.NotNil(cli) + clients = append(clients, cli) + + go func() { + defer wg.Done() + testutil.Eventually(re, func() bool { + _, _, err := cli.GetTS(ctx) + return err == nil + }) + }() + } + + wg.Wait() + return clients +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3cc48e682b9..60cc8c1161d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -83,7 +83,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { } func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) { - for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") { + keyspaceGroups := handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") + for _, group := range keyspaceGroups { // Do not delete default keyspace group. if group.ID == mcsutils.DefaultKeyspaceGroupID { continue @@ -130,6 +131,80 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } } + + keyspaceIDs := []uint32{0, 1, 2, 3, 1000} + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() { + // Create multiple keyspace groups, and every keyspace should be served by one of them + // on a tso server. + re := suite.Require() + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2, 12}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + testutil.Eventually(re, func() bool { + for _, param := range params { + for _, keyspaceID := range param.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) { + tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) + re.NoError(err) + re.NotNil(tam) + served = true + } + } + if !served { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + keyspaceIDs := make([]uint32, 0) + for _, param := range params { + keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) + } + + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { @@ -160,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { }) ts.Physical += time.Hour.Milliseconds() // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) re.NoError(err) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index da23ab1d1eb..d074c49a497 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() - cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints}) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, []string{backendEndpoints}) physical, logical, err := cli.GetTS(ctx) re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() { type CommonTestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster - pdLeader *tests.TestServer - tsoPrimary *tso.Server - backendEndpoints string + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster + pdLeader *tests.TestServer + // tsoDefaultPrimaryServer is the primary server of the default keyspace group + tsoDefaultPrimaryServer *tso.Server + backendEndpoints string } func TestCommonTestSuite(t *testing.T) { @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) suite.tsoCluster.WaitForDefaultPrimaryServing(re) - suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) } func (suite *CommonTestSuite) TearDownSuite() { @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() { func (suite *CommonTestSuite) TestAdvertiseAddr() { re := suite.Require() - conf := suite.tsoPrimary.GetConfig() + conf := suite.tsoDefaultPrimaryServer.GetConfig() re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr()) } func (suite *CommonTestSuite) TestMetrics() { re := suite.Require() - resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics") + resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics") re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 2d41aad4b84..80518798179 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,10 +28,14 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" + handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" ) var r = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -44,12 +48,14 @@ type tsoClientTestSuite struct { cancel context.CancelFunc // The PD cluster. cluster *tests.TestCluster + // pdLeaderServer is the leader server of the PD cluster. + pdLeaderServer *tests.TestServer // The TSO service in microservice mode. tsoCluster *mcs.TestTSOCluster backendEndpoints string - - client pd.TSOClient + keyspaceIDs []uint32 + clients []pd.Client } func TestLegacyTSOClient(t *testing.T) { @@ -78,16 +84,56 @@ func (suite *tsoClientTestSuite) SetupSuite() { err = suite.cluster.RunInitialServers() re.NoError(err) leaderName := suite.cluster.WaitLeader() - pdLeader := suite.cluster.GetServer(leaderName) - re.NoError(pdLeader.BootstrapCluster()) - suite.backendEndpoints = pdLeader.GetAddr() + suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + re.NoError(suite.pdLeaderServer.BootstrapCluster()) + suite.backendEndpoints = suite.pdLeaderServer.GetAddr() + suite.keyspaceIDs = make([]uint32, 0) + if suite.legacy { - suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) + client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) re.NoError(err) + suite.keyspaceIDs = append(suite.keyspaceIDs, 0) + suite.clients = make([]pd.Client, 0) + suite.clients = append(suite.clients, client) } else { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) re.NoError(err) - suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + for _, param := range params { + suite.keyspaceIDs = append(suite.keyspaceIDs, param.keyspaceIDs...) + } + + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } } @@ -101,42 +147,46 @@ func (suite *tsoClientTestSuite) TearDownSuite() { func (suite *tsoClientTestSuite) TestGetTS() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - var lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - physical, logical, err := suite.client.GetTS(suite.ctx) - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Less(lastTS, ts) - lastTS = ts - } - }() + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetTS(suite.ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(lastTS, ts) + lastTS = ts + } + }(client) + } } wg.Wait() } func (suite *tsoClientTestSuite) TestGetTSAsync() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - tsFutures := make([]pd.TSFuture, tsoRequestRound) - for i := range tsFutures { - tsFutures[i] = suite.client.GetTSAsync(suite.ctx) - } - var lastTS uint64 = math.MaxUint64 - for i := len(tsFutures) - 1; i >= 0; i-- { - physical, logical, err := tsFutures[i].Wait() - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Greater(lastTS, ts) - lastTS = ts - } - }() + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + tsFutures := make([]pd.TSFuture, tsoRequestRound) + for j := range tsFutures { + tsFutures[j] = client.GetTSAsync(suite.ctx) + } + var lastTS uint64 = math.MaxUint64 + for j := len(tsFutures) - 1; j >= 0; j-- { + physical, logical, err := tsFutures[j].Wait() + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Greater(lastTS, ts) + lastTS = ts + } + }(client) + } } wg.Wait() } @@ -147,33 +197,36 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) - oldLeaderName := suite.cluster.WaitLeader() - err := suite.cluster.GetServer(oldLeaderName).ResignLeader() - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) - newLeaderName := suite.cluster.WaitLeader() - re.NotEqual(oldLeaderName, newLeaderName) - // Request a new TSO. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader back. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) - err = suite.cluster.GetServer(newLeaderName).ResignLeader() - re.NoError(err) - // Should NOT panic here. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + for i := 0; i < len(suite.clients); i++ { + client := suite.clients[i] + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader to trigger the TSO resetting. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + oldLeaderName := suite.cluster.WaitLeader() + err := suite.cluster.GetServer(oldLeaderName).ResignLeader() + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + newLeaderName := suite.cluster.WaitLeader() + re.NotEqual(oldLeaderName, newLeaderName) + // Request a new TSO. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader back. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) + err = suite.cluster.GetServer(newLeaderName).ResignLeader() + re.NoError(err) + // Should NOT panic here. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + } } func (suite *tsoClientTestSuite) TestRandomResignLeader() { @@ -181,29 +234,37 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. n := r.Intn(2) + 3 time.Sleep(time.Duration(n) * time.Second) if !suite.legacy { - suite.tsoCluster.ResignPrimary() - suite.tsoCluster.WaitForDefaultPrimaryServing(re) + wg := sync.WaitGroup{} + // Select the default keyspace and a randomly picked keyspace to test + keyspaceIDs := []uint32{mcsutils.DefaultKeyspaceID} + selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1) + 1) + keyspaceIDs = append(keyspaceIDs, suite.keyspaceIDs[selectIdx]) + wg.Add(len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + go func(keyspaceID uint32) { + defer wg.Done() + err := suite.tsoCluster.ResignPrimary(keyspaceID, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0) + }(keyspaceID) + } + wg.Wait() } else { err := suite.cluster.ResignLeader() re.NoError(err) suite.cluster.WaitLeader() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) } func (suite *tsoClientTestSuite) TestRandomShutdown() { @@ -211,12 +272,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. @@ -228,9 +284,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) suite.TearDownSuite() suite.SetupSuite() } @@ -286,7 +342,7 @@ func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, b for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 for { select { diff --git a/tests/server/tso/common_test.go b/tests/server/tso/common_test.go index f528103db84..877fcb10982 100644 --- a/tests/server/tso/common_test.go +++ b/tests/server/tso/common_test.go @@ -28,7 +28,7 @@ import ( ) const ( - tsoRequestConcurrencyNumber = 5 + tsoRequestConcurrencyNumber = 3 tsoRequestRound = 30 tsoCount = 10 )