Skip to content

Commit

Permalink
use mcs/utils and use failpoint test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed May 4, 2023
1 parent 3787d04 commit 787509b
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 27 deletions.
15 changes: 6 additions & 9 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -38,10 +39,6 @@ const (
AllocStep = uint64(100)
// AllocLabel is used to label keyspace idAllocator's metrics.
AllocLabel = "keyspace-idAlloc"
// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"
// DefaultKeyspaceID is the id of default keyspace.
DefaultKeyspaceID = uint32(0)
// regionLabelIDPrefix is used to prefix the keyspace region label.
regionLabelIDPrefix = "keyspaces/"
// regionLabelKey is the key for keyspace id in keyspace region label.
Expand Down Expand Up @@ -106,13 +103,13 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage,
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
return err
}
now := time.Now().Unix()
defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{
Id: DefaultKeyspaceID,
Name: DefaultKeyspaceName,
Id: utils.DefaultKeyspaceID,
Name: utils.DefaultKeyspaceName,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
Expand Down Expand Up @@ -425,7 +422,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down Expand Up @@ -477,7 +474,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
re.Error(err)
}
// Changing config of DEFAULT keyspace is allowed.
updated, err := manager.UpdateKeyspaceConfig(DefaultKeyspaceName, mutations)
updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations)
re.NoError(err)
// remove auto filled fields
delete(updated.Config, TSOKeyspaceGroupIDKey)
Expand Down Expand Up @@ -203,7 +204,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
_, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime)
re.Error(err)
// Changing state of DEFAULT keyspace is not allowed.
_, err = manager.UpdateKeyspaceState(DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
_, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
re.Error(err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func validateID(id uint32) error {
if id > spaceIDMax {
return errors.Errorf("illegal keyspace id %d, larger than spaceID Max %d", id, spaceIDMax)
}
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
return errors.Errorf("illegal keyspace id %d, collides with default keyspace id", id)
}
return nil
Expand All @@ -94,7 +95,7 @@ func validateName(name string) error {
if !isValid {
return errors.Errorf("illegal keyspace name %s, should contain only alphanumerical and underline", name)
}
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
return errors.Errorf("illegal keyspace name %s, collides with default keyspace name", name)
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/mcs/utils"
)

func TestValidateID(t *testing.T) {
Expand All @@ -30,7 +31,7 @@ func TestValidateID(t *testing.T) {
id uint32
hasErr bool
}{
{DefaultKeyspaceID, true}, // Reserved id should result in error.
{utils.DefaultKeyspaceID, true}, // Reserved id should result in error.
{100, false},
{spaceIDMax - 1, false},
{spaceIDMax, false},
Expand All @@ -48,7 +49,7 @@ func TestValidateName(t *testing.T) {
name string
hasErr bool
}{
{DefaultKeyspaceName, true}, // Reserved name should result in error.
{utils.DefaultKeyspaceName, true}, // Reserved name should result in error.
{"keyspaceName1", false},
{"keyspace_name_1", false},
{"10", false},
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// LeaderTickInterval is the interval to check leader
LeaderTickInterval = 50 * time.Millisecond

// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
Expand Down
19 changes: 15 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
const (
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
retryIntervalGetGlobalTSOFromTSOServer = 500 * time.Millisecond
)

// gRPC errors
Expand Down Expand Up @@ -1774,10 +1775,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}

func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
Expand All @@ -1787,18 +1784,32 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
Count: 1,
}
var (
forwardedHost string
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
select {
case s.updateServicePrimaryAddrCh <- struct{}{}:
log.Info("update service primary address when meet not leader error")
default:
}
time.Sleep(retryIntervalGetGlobalTSOFromTSOServer)
continue
}
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,12 @@ func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int6
return revision, nil
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}
Expand Down
113 changes: 110 additions & 3 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ func TestAPIServerForwardTestSuite(t *testing.T) {
suite.Run(t, new(APIServerForwardTestSuite))
}

func (suite *APIServerForwardTestSuite) SetupSuite() {
func (suite *APIServerForwardTestSuite) SetupTest() {
var err error
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1)
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3)
re.NoError(err)

err = suite.cluster.RunInitialServers()
Expand All @@ -246,7 +246,7 @@ func (suite *APIServerForwardTestSuite) SetupSuite() {
suite.NoError(err)
}

func (suite *APIServerForwardTestSuite) TearDownSuite() {
func (suite *APIServerForwardTestSuite) TearDownTest() {
suite.pdClient.Close()

etcdClient := suite.pdLeader.GetEtcdClient()
Expand Down Expand Up @@ -308,6 +308,113 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
suite.checkAvailableTSO()
}

func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() {
// TODO: test random kill primary with 3 nodes
re := suite.Require()

tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

for j := 0; j < 10; j++ {
tc.ResignPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
tc.WaitForDefaultPrimaryServing(re)
var err error
for i := 0; i < 3; i++ { // try 3 times
_, _, err = suite.pdClient.GetTS(suite.ctx)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
suite.NoError(err)
suite.checkAvailableTSO()
}
}

func (suite *APIServerForwardTestSuite) TestResignAPIPrimaryForward() {
re := suite.Require()

tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

for j := 0; j < 10; j++ {
suite.pdLeader.ResignLeader()
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
suite.backendEndpoints = suite.pdLeader.GetAddr()
_, _, err = suite.pdClient.GetTS(suite.ctx)
suite.NoError(err)
}
}

func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower1() {
suite.checkForwardTSOUnexpectedToFollower(func() {
// unary call will retry internally
// try to update gc safe point
min, err := suite.pdClient.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 1)
suite.NoError(err)
suite.Equal(uint64(0), min)

})
}

func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower2() {
suite.checkForwardTSOUnexpectedToFollower(func() {
// unary call will retry internally
// try to set external ts
ts, err := suite.pdClient.GetExternalTimestamp(suite.ctx)
suite.NoError(err)
err = suite.pdClient.SetExternalTimestamp(suite.ctx, ts+1)
suite.NoError(err)
})
}

func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower3() {
suite.checkForwardTSOUnexpectedToFollower(func() {
_, _, err := suite.pdClient.GetTS(suite.ctx)
suite.Error(err)
})
}

func (suite *APIServerForwardTestSuite) checkForwardTSOUnexpectedToFollower(checkTSO func()) {
re := suite.Require()
tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
tc.WaitForDefaultPrimaryServing(re)

// get follower's address
servers := tc.GetServers()
oldPrimary := tc.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID).GetAddr()
var follower string
for addr := range servers {
if addr != oldPrimary {
follower = addr
break
}
}
re.NotEmpty(follower)

// write follower's address to cache to simulate cache is not updated.
suite.pdLeader.GetServer().SetServicePrimaryAddr(utils.TSOServiceName, follower)
errorAddr, ok := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(ok)
suite.Equal(follower, errorAddr)

// test tso request
checkTSO()

// test tso request will success after cache is updated
suite.checkAvailableTSO()
newPrimary, exist2 := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist2)
suite.NotEqual(errorAddr, newPrimary)
suite.Equal(oldPrimary, newPrimary)
tc.Destroy()
}

func (suite *APIServerForwardTestSuite) addRegions() {
leader := suite.cluster.GetServer(suite.cluster.WaitLeader())
rc := leader.GetServer().GetRaftCluster()
Expand Down
10 changes: 5 additions & 5 deletions tests/server/apiv2/handlers/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -68,8 +68,8 @@ func (suite *keyspaceTestSuite) TestCreateLoadKeyspace() {
loaded := mustLoadKeyspaces(re, suite.server, created.Name)
re.Equal(created, loaded)
}
defaultKeyspace := mustLoadKeyspaces(re, suite.server, keyspace.DefaultKeyspaceName)
re.Equal(keyspace.DefaultKeyspaceName, defaultKeyspace.Name)
defaultKeyspace := mustLoadKeyspaces(re, suite.server, utils.DefaultKeyspaceName)
re.Equal(utils.DefaultKeyspaceName, defaultKeyspace.Name)
re.Equal(keyspacepb.KeyspaceState_ENABLED, defaultKeyspace.State)
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
re.Equal(keyspacepb.KeyspaceState_TOMBSTONE, tombstone.State)
}
// Changing default keyspace's state is NOT allowed.
success, _ := sendUpdateStateRequest(re, suite.server, keyspace.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"})
success, _ := sendUpdateStateRequest(re, suite.server, utils.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"})
re.False(success)
}

Expand All @@ -134,7 +134,7 @@ func (suite *keyspaceTestSuite) TestLoadRangeKeyspace() {
for i, created := range keyspaces {
re.Equal(created, loadResponse.Keyspaces[i+1].KeyspaceMeta)
}
re.Equal(keyspace.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name)
re.Equal(utils.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name)
re.Equal(keyspacepb.KeyspaceState_ENABLED, loadResponse.Keyspaces[0].State)
}

Expand Down

0 comments on commit 787509b

Please sign in to comment.