From 787509b10af20e3d6a28aaa4c25c57de98a85f4d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 02:01:09 +0800 Subject: [PATCH] use mcs/utils and use failpoint test Signed-off-by: lhy1024 --- pkg/keyspace/keyspace.go | 15 +-- pkg/keyspace/keyspace_test.go | 5 +- pkg/keyspace/util.go | 5 +- pkg/keyspace/util_test.go | 5 +- pkg/mcs/utils/constant.go | 3 + server/grpc_service.go | 19 +++- server/server.go | 6 + tests/integrations/mcs/tso/server_test.go | 113 ++++++++++++++++++- tests/server/apiv2/handlers/keyspace_test.go | 10 +- 9 files changed, 154 insertions(+), 27 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 7bd8de7bc28..cf549fdf57a 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" @@ -38,10 +39,6 @@ const ( AllocStep = uint64(100) // AllocLabel is used to label keyspace idAllocator's metrics. AllocLabel = "keyspace-idAlloc" - // DefaultKeyspaceName is the name reserved for default keyspace. - DefaultKeyspaceName = "DEFAULT" - // DefaultKeyspaceID is the id of default keyspace. - DefaultKeyspaceID = uint32(0) // regionLabelIDPrefix is used to prefix the keyspace region label. regionLabelIDPrefix = "keyspaces/" // regionLabelKey is the key for keyspace id in keyspace region label. @@ -106,13 +103,13 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage, // Bootstrap saves default keyspace info. func (manager *Manager) Bootstrap() error { // Split Keyspace Region for default keyspace. - if err := manager.splitKeyspaceRegion(DefaultKeyspaceID); err != nil { + if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil { return err } now := time.Now().Unix() defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{ - Id: DefaultKeyspaceID, - Name: DefaultKeyspaceName, + Id: utils.DefaultKeyspaceID, + Name: utils.DefaultKeyspaceName, State: keyspacepb.KeyspaceState_ENABLED, CreatedAt: now, StateChangedAt: now, @@ -425,7 +422,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation) // It returns error if saving failed, operation not allowed, or if keyspace not exists. func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) { // Changing the state of default keyspace is not allowed. - if name == DefaultKeyspaceName { + if name == utils.DefaultKeyspaceName { log.Warn("[keyspace] failed to update keyspace config", zap.Error(errModifyDefault), ) @@ -477,7 +474,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key // It returns error if saving failed, operation not allowed, or if keyspace not exists. func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) { // Changing the state of default keyspace is not allowed. - if id == DefaultKeyspaceID { + if id == utils.DefaultKeyspaceID { log.Warn("[keyspace] failed to update keyspace config", zap.Error(errModifyDefault), ) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index f1ef85711fd..5c458684635 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" @@ -163,7 +164,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() { re.Error(err) } // Changing config of DEFAULT keyspace is allowed. - updated, err := manager.UpdateKeyspaceConfig(DefaultKeyspaceName, mutations) + updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations) re.NoError(err) // remove auto filled fields delete(updated.Config, TSOKeyspaceGroupIDKey) @@ -203,7 +204,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() { _, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime) re.Error(err) // Changing state of DEFAULT keyspace is not allowed. - _, err = manager.UpdateKeyspaceState(DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime) + _, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime) re.Error(err) } } diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 69c1e776f04..b739fea8898 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/tikv/pd/pkg/codec" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/storage/endpoint" ) @@ -77,7 +78,7 @@ func validateID(id uint32) error { if id > spaceIDMax { return errors.Errorf("illegal keyspace id %d, larger than spaceID Max %d", id, spaceIDMax) } - if id == DefaultKeyspaceID { + if id == utils.DefaultKeyspaceID { return errors.Errorf("illegal keyspace id %d, collides with default keyspace id", id) } return nil @@ -94,7 +95,7 @@ func validateName(name string) error { if !isValid { return errors.Errorf("illegal keyspace name %s, should contain only alphanumerical and underline", name) } - if name == DefaultKeyspaceName { + if name == utils.DefaultKeyspaceName { return errors.Errorf("illegal keyspace name %s, collides with default keyspace name", name) } return nil diff --git a/pkg/keyspace/util_test.go b/pkg/keyspace/util_test.go index 40277e298b6..02da4814d50 100644 --- a/pkg/keyspace/util_test.go +++ b/pkg/keyspace/util_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/mcs/utils" ) func TestValidateID(t *testing.T) { @@ -30,7 +31,7 @@ func TestValidateID(t *testing.T) { id uint32 hasErr bool }{ - {DefaultKeyspaceID, true}, // Reserved id should result in error. + {utils.DefaultKeyspaceID, true}, // Reserved id should result in error. {100, false}, {spaceIDMax - 1, false}, {spaceIDMax, false}, @@ -48,7 +49,7 @@ func TestValidateName(t *testing.T) { name string hasErr bool }{ - {DefaultKeyspaceName, true}, // Reserved name should result in error. + {utils.DefaultKeyspaceName, true}, // Reserved name should result in error. {"keyspaceName1", false}, {"keyspace_name_1", false}, {"10", false}, diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index e29aa6a5008..fba1881b4c4 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -35,6 +35,9 @@ const ( // LeaderTickInterval is the interval to check leader LeaderTickInterval = 50 * time.Millisecond + // DefaultKeyspaceName is the name reserved for default keyspace. + DefaultKeyspaceName = "DEFAULT" + // DefaultKeyspaceID is the default key space id. // Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215) // ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap diff --git a/server/grpc_service.go b/server/grpc_service.go index aac69e4b8c2..080a150e553 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -51,6 +51,7 @@ import ( const ( heartbeatSendTimeout = 5 * time.Second maxRetryTimesGetGlobalTSOFromTSOServer = 3 + retryIntervalGetGlobalTSOFromTSOServer = 500 * time.Millisecond ) // gRPC errors @@ -1774,10 +1775,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan } func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) { - forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) - if !ok || forwardedHost == "" { - return pdpb.Timestamp{}, ErrNotFoundTSOAddr - } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, @@ -1787,11 +1784,16 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest Count: 1, } var ( + forwardedHost string forwardStream tsopb.TSO_TsoClient ts *tsopb.TsoResponse err error ) for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) + if !ok || forwardedHost == "" { + return pdpb.Timestamp{}, ErrNotFoundTSOAddr + } forwardStream, err = s.getTSOForwardStream(forwardedHost) if err != nil { return pdpb.Timestamp{}, err @@ -1799,6 +1801,15 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest forwardStream.Send(request) ts, err = forwardStream.Recv() if err != nil { + if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { + select { + case s.updateServicePrimaryAddrCh <- struct{}{}: + log.Info("update service primary address when meet not leader error") + default: + } + time.Sleep(retryIntervalGetGlobalTSOFromTSOServer) + continue + } if strings.Contains(err.Error(), codes.Unavailable.String()) { s.tsoClientPool.Lock() delete(s.tsoClientPool.clients, forwardedHost) diff --git a/server/server.go b/server/server.go index e83831364c7..6e0ba68d9ae 100644 --- a/server/server.go +++ b/server/server.go @@ -1840,6 +1840,12 @@ func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int6 return revision, nil } +// SetServicePrimaryAddr sets the primary address directly. +// Note: This function is only used for test. +func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { + s.servicePrimaryMap.Store(serviceName, addr) +} + func (s *Server) servicePrimaryKey(serviceName string) string { return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 9eae238056a..ed6973ec9f2 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -224,11 +224,11 @@ func TestAPIServerForwardTestSuite(t *testing.T) { suite.Run(t, new(APIServerForwardTestSuite)) } -func (suite *APIServerForwardTestSuite) SetupSuite() { +func (suite *APIServerForwardTestSuite) SetupTest() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -246,7 +246,7 @@ func (suite *APIServerForwardTestSuite) SetupSuite() { suite.NoError(err) } -func (suite *APIServerForwardTestSuite) TearDownSuite() { +func (suite *APIServerForwardTestSuite) TearDownTest() { suite.pdClient.Close() etcdClient := suite.pdLeader.GetEtcdClient() @@ -308,6 +308,113 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { suite.checkAvailableTSO() } +func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() { + // TODO: test random kill primary with 3 nodes + re := suite.Require() + + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) + + for j := 0; j < 10; j++ { + tc.ResignPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + tc.WaitForDefaultPrimaryServing(re) + var err error + for i := 0; i < 3; i++ { // try 3 times + _, _, err = suite.pdClient.GetTS(suite.ctx) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + suite.NoError(err) + suite.checkAvailableTSO() + } +} + +func (suite *APIServerForwardTestSuite) TestResignAPIPrimaryForward() { + re := suite.Require() + + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) + + for j := 0; j < 10; j++ { + suite.pdLeader.ResignLeader() + suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) + suite.backendEndpoints = suite.pdLeader.GetAddr() + _, _, err = suite.pdClient.GetTS(suite.ctx) + suite.NoError(err) + } +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower1() { + suite.checkForwardTSOUnexpectedToFollower(func() { + // unary call will retry internally + // try to update gc safe point + min, err := suite.pdClient.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 1) + suite.NoError(err) + suite.Equal(uint64(0), min) + + }) +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower2() { + suite.checkForwardTSOUnexpectedToFollower(func() { + // unary call will retry internally + // try to set external ts + ts, err := suite.pdClient.GetExternalTimestamp(suite.ctx) + suite.NoError(err) + err = suite.pdClient.SetExternalTimestamp(suite.ctx, ts+1) + suite.NoError(err) + }) +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower3() { + suite.checkForwardTSOUnexpectedToFollower(func() { + _, _, err := suite.pdClient.GetTS(suite.ctx) + suite.Error(err) + }) +} + +func (suite *APIServerForwardTestSuite) checkForwardTSOUnexpectedToFollower(checkTSO func()) { + re := suite.Require() + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + tc.WaitForDefaultPrimaryServing(re) + + // get follower's address + servers := tc.GetServers() + oldPrimary := tc.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID).GetAddr() + var follower string + for addr := range servers { + if addr != oldPrimary { + follower = addr + break + } + } + re.NotEmpty(follower) + + // write follower's address to cache to simulate cache is not updated. + suite.pdLeader.GetServer().SetServicePrimaryAddr(utils.TSOServiceName, follower) + errorAddr, ok := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) + suite.True(ok) + suite.Equal(follower, errorAddr) + + // test tso request + checkTSO() + + // test tso request will success after cache is updated + suite.checkAvailableTSO() + newPrimary, exist2 := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) + suite.True(exist2) + suite.NotEqual(errorAddr, newPrimary) + suite.Equal(oldPrimary, newPrimary) + tc.Destroy() +} + func (suite *APIServerForwardTestSuite) addRegions() { leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) rc := leader.GetServer().GetRaftCluster() diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index f976208f65b..4feb7c9af5c 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -68,8 +68,8 @@ func (suite *keyspaceTestSuite) TestCreateLoadKeyspace() { loaded := mustLoadKeyspaces(re, suite.server, created.Name) re.Equal(created, loaded) } - defaultKeyspace := mustLoadKeyspaces(re, suite.server, keyspace.DefaultKeyspaceName) - re.Equal(keyspace.DefaultKeyspaceName, defaultKeyspace.Name) + defaultKeyspace := mustLoadKeyspaces(re, suite.server, utils.DefaultKeyspaceName) + re.Equal(utils.DefaultKeyspaceName, defaultKeyspace.Name) re.Equal(keyspacepb.KeyspaceState_ENABLED, defaultKeyspace.State) } @@ -120,7 +120,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() { re.Equal(keyspacepb.KeyspaceState_TOMBSTONE, tombstone.State) } // Changing default keyspace's state is NOT allowed. - success, _ := sendUpdateStateRequest(re, suite.server, keyspace.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"}) + success, _ := sendUpdateStateRequest(re, suite.server, utils.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"}) re.False(success) } @@ -134,7 +134,7 @@ func (suite *keyspaceTestSuite) TestLoadRangeKeyspace() { for i, created := range keyspaces { re.Equal(created, loadResponse.Keyspaces[i+1].KeyspaceMeta) } - re.Equal(keyspace.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name) + re.Equal(utils.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name) re.Equal(keyspacepb.KeyspaceState_ENABLED, loadResponse.Keyspaces[0].State) }