Skip to content

Commit

Permalink
tests: add more tests for multiple keyspace groups (#6395)
Browse files Browse the repository at this point in the history
ref #5895

Add CheckMultiKeyspacesTSO() and WaitForMultiKeyspacesTSOAvailable in test utility. Add TestTSOKeyspaceGroupManager/TestKeyspacesServedByNonDefaultKeyspaceGroup. Cover TestGetTS, TestGetTSAsync, TestUpdateAfterResetTSO in TestMicroserviceTSOClient for multiple keyspace groups.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored May 4, 2023
1 parent a0613d8 commit a4f9af3
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 108 deletions.
7 changes: 3 additions & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// ResignPrimary resigns the primary of the given keyspace.
func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID(
return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil
}

// GetElectionMember returns the election member of the given keyspace group
// GetElectionMember returns the election member of the keyspace group serving the given keyspace.
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
Expand Down
13 changes: 9 additions & 4 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mcs

import (
"context"
"fmt"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID)
if primaryServer == nil {
return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID)
}
return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID)
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
// GetPrimaryServer returns the primary TSO server of the given keyspace
func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand Down
87 changes: 85 additions & 2 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
)

var once sync.Once
Expand All @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) {
return err
}

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test.
func SetupClientWithDefaultKeyspaceName(
ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test.
func SetupClientWithKeyspaceID(
ctx context.Context, re *require.Assertions,
keyspaceID uint32, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
Expand Down Expand Up @@ -137,3 +150,73 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error {
}
return errors.WithStack(err)
}

// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces.
func CheckMultiKeyspacesTSO(
ctx context.Context, re *require.Assertions,
clients []pd.Client, parallelAct func(),
) {
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
wg.Add(len(clients))

for _, client := range clients {
go func(cli pd.Client) {
defer wg.Done()
var ts, lastTS uint64
for {
select {
case <-ctx.Done():
// Make sure the lastTS is not empty
re.NotEmpty(lastTS)
return
default:
}
physical, logical, err := cli.GetTS(ctx)
// omit the error check since there are many kinds of errors
if err != nil {
continue
}
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}(client)
}

wg.Add(1)
go func() {
defer wg.Done()
parallelAct()
cancel()
}()

wg.Wait()
}

// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side
func WaitForMultiKeyspacesTSOAvailable(
ctx context.Context, re *require.Assertions,
keyspaceIDs []uint32, backendEndpoints []string,
) []pd.Client {
wg := sync.WaitGroup{}
wg.Add(len(keyspaceIDs))

clients := make([]pd.Client, 0, len(keyspaceIDs))
for _, keyspaceID := range keyspaceIDs {
cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints)
re.NotNil(cli)
clients = append(clients, cli)

go func() {
defer wg.Done()
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(ctx)
return err == nil
})
}()
}

wg.Wait()
return clients
}
79 changes: 77 additions & 2 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {
}

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
keyspaceGroups := handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0")
for _, group := range keyspaceGroups {
// Do not delete default keyspace group.
if group.ID == mcsutils.DefaultKeyspaceGroupID {
continue
Expand Down Expand Up @@ -130,6 +131,80 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
}
}
}

keyspaceIDs := []uint32{0, 1, 2, 3, 1000}
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() {
// Create multiple keyspace groups, and every keyspace should be served by one of them
// on a tso server.
re := suite.Require()

params := []struct {
keyspaceGroupID uint32
keyspaceIDs []uint32
}{
{0, []uint32{0, 10}},
{1, []uint32{1, 11}},
{2, []uint32{2, 12}},
}

for _, param := range params {
if param.keyspaceGroupID == 0 {
// we have already created default keyspace group, so we can skip it.
// keyspace 10 isn't assigned to any keyspace group, so they will be
// served by default keyspace group.
continue
}
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: param.keyspaceGroupID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: param.keyspaceIDs,
},
},
})
}

testutil.Eventually(re, func() bool {
for _, param := range params {
for _, keyspaceID := range param.keyspaceIDs {
served := false
for _, server := range suite.tsoCluster.GetServers() {
if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) {
tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
re.NoError(err)
re.NotNil(tam)
served = true
}
}
if !served {
return false
}
}
}
return true
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

keyspaceIDs := make([]uint32, 0)
for _, param := range params {
keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...)
}

clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
Expand Down Expand Up @@ -160,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
Expand Down
23 changes: 12 additions & 11 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc())
defer cleanup()

cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints})
cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, []string{backendEndpoints})
physical, logical, err := cli.GetTS(ctx)
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() {

type CommonTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
tsoPrimary *tso.Server
backendEndpoints string
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
// tsoDefaultPrimaryServer is the primary server of the default keyspace group
tsoDefaultPrimaryServer *tso.Server
backendEndpoints string
}

func TestCommonTestSuite(t *testing.T) {
Expand All @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() {
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
}

func (suite *CommonTestSuite) TearDownSuite() {
Expand All @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() {
func (suite *CommonTestSuite) TestAdvertiseAddr() {
re := suite.Require()

conf := suite.tsoPrimary.GetConfig()
conf := suite.tsoDefaultPrimaryServer.GetConfig()
re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr())
}

func (suite *CommonTestSuite) TestMetrics() {
re := suite.Require()

resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics")
resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics")
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
Expand Down
Loading

0 comments on commit a4f9af3

Please sign in to comment.