Skip to content

Commit

Permalink
unify the must wait leader
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jun 24, 2022
1 parent 1be84d6 commit 95d8400
Show file tree
Hide file tree
Showing 28 changed files with 59 additions and 121 deletions.
2 changes: 1 addition & 1 deletion server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestAdminTestSuite(t *testing.T) {
func (suite *adminTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCheckerTestSuite(t *testing.T) {
func (suite *checkerTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/checker", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestClusterTestSuite(t *testing.T) {
func (suite *clusterTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (suite *configTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestHotStatusTestSuite(t *testing.T) {
func (suite *hotStatusTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/hotspot", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (suite *labelsStoreTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.StrictlyMatchLabel = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (suite *strictlyLabelsStoreTestSuite) SetupSuite() {
cfg.Replication.StrictlyMatchLabel = true
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

suite.grpcSvr = &server.GrpcServer{Server: suite.svr}
addr := suite.svr.GetAddr()
Expand Down
2 changes: 1 addition & 1 deletion server/api/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLogTestSuite(t *testing.T) {
func (suite *logTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/admin", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (suite *minResolvedTSTestSuite) SetupSuite() {
re := suite.Require()
cluster.DefaultMinResolvedTSPersistenceInterval = time.Microsecond
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (suite *operatorTestSuite) SetupSuite() {
re := suite.Require()
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedule/unexpectedOperator", "return(true)"))
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 1 })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (suite *transferRegionOperatorTestSuite) SetupSuite() {
re := suite.Require()
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedule/unexpectedOperator", "return(true)"))
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 3 })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestProfTestSuite(t *testing.T) {
func (suite *profTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/debug", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/region_label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRegionLabelTestSuite(t *testing.T) {
func (suite *regionLabelTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/config/region-label/", addr, apiPrefix)
Expand Down
8 changes: 4 additions & 4 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestRegionTestSuite(t *testing.T) {
func (suite *regionTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestGetRegionTestSuite(t *testing.T) {
func (suite *getRegionTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestGetRegionRangeHolesTestSuite(t *testing.T) {
func (suite *getRegionRangeHolesTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})
addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
mustBootstrapCluster(re, suite.svr)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestRegionsReplicatedTestSuite(t *testing.T) {
func (suite *regionsReplicatedTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestRuleTestSuite(t *testing.T) {
func (suite *ruleTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/config", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestScheduleTestSuite(t *testing.T) {
func (suite *scheduleTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/schedulers", addr, apiPrefix)
Expand Down
21 changes: 2 additions & 19 deletions server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
}
close(ch)
// wait etcd and http servers
mustWaitLeader(re, svrs)
server.MustWaitLeader(re, svrs)

// clean up
clean := func() {
Expand All @@ -122,23 +122,6 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
return cfgs, svrs, clean
}

func mustWaitLeader(re *require.Assertions, svrs []*server.Server) {
testutil.Eventually(re, func() bool {
var leader *pdpb.Member
for _, svr := range svrs {
l := svr.GetLeader()
// All servers' GetLeader should return the same leader.
if l == nil || (leader != nil && l.GetMemberId() != leader.GetMemberId()) {
return false
}
if leader == nil {
leader = l
}
}
return true
})
}

func mustBootstrapCluster(re *require.Assertions, s *server.Server) {
grpcPDClient := testutil.MustNewGrpcClient(re, s.GetAddr())
req := &pdpb.BootstrapRequest{
Expand All @@ -164,7 +147,7 @@ func TestServiceTestSuite(t *testing.T) {
func (suite *serviceTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

mustBootstrapCluster(re, suite.svr)
mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
Expand Down
2 changes: 1 addition & 1 deletion server/api/service_gc_safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestServiceGCSafepointTestSuite(t *testing.T) {
func (suite *serviceGCSafepointTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/service_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (suite *auditMiddlewareTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestRateLimitConfigTestSuite(t *testing.T) {
func (suite *rateLimitConfigTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})
mustBootstrapCluster(re, suite.svr)
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", suite.svr.GetAddr(), apiPrefix)
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestStatsTestSuite(t *testing.T) {
func (suite *statsTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *storeTestSuite) SetupSuite() {
// TODO: enable placmentrules
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.EnablePlacementRules = false })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.grpcSvr = &server.GrpcServer{Server: suite.svr}
Expand Down
2 changes: 1 addition & 1 deletion server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTrend(t *testing.T) {
re := require.New(t)
svr, cleanup := mustNewServer(re)
defer cleanup()
mustWaitLeader(re, []*server.Server{svr})
server.MustWaitLeader(re, []*server.Server{svr})

mustBootstrapCluster(re, svr)
for i := 1; i <= 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion server/api/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (suite *tsoTestSuite) SetupSuite() {
cfg.EnableLocalTSO = true
cfg.Labels[config.ZoneLabel] = "dc-1"
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestUnsafeOperationTestSuite(t *testing.T) {
func (suite *unsafeOperationTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/admin/unsafe", addr, apiPrefix)
Expand Down
16 changes: 1 addition & 15 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,6 @@ func TestLeaderServerTestSuite(t *testing.T) {
suite.Run(t, new(leaderServerTestSuite))
}

func (suite *leaderServerTestSuite) mustWaitLeader(svrs []*Server) *Server {
var leader *Server
testutil.Eventually(suite.Require(), func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.member.IsLeader() {
leader = s
return true
}
}
return false
})
return leader
}

func (suite *leaderServerTestSuite) SetupSuite() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.svrs = make(map[string]*Server)
Expand Down Expand Up @@ -125,7 +111,7 @@ func (suite *leaderServerTestSuite) newTestServersWithCfgs(ctx context.Context,
suite.NotNil(svr)
svrs = append(svrs, svr)
}
suite.mustWaitLeader(svrs)
MustWaitLeader(suite.Require(), svrs)

cleanup := func() {
for _, svr := range svrs {
Expand Down
17 changes: 17 additions & 0 deletions server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/assertutil"
"github.com/tikv/pd/pkg/tempurl"
"github.com/tikv/pd/pkg/testutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -115,3 +117,18 @@ func NewTestMultiConfig(c *assertutil.Checker, count int) []*config.Config {

return cfgs
}

// MustWaitLeader return the leader until timeout.
func MustWaitLeader(re *require.Assertions, svrs []*Server) *Server {
var leader *Server
testutil.Eventually(re, func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.member.IsLeader() {
leader = s
return true
}
}
return false
})
return leader
}
15 changes: 1 addition & 14 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (suite *clientTestSuite) SetupSuite() {
suite.grpcPDClient = testutil.MustNewGrpcClient(re, suite.srv.GetAddr())
suite.grpcSvr = &server.GrpcServer{Server: suite.srv}

suite.mustWaitLeader(map[string]*server.Server{suite.srv.GetAddr(): suite.srv})
server.MustWaitLeader(re, []*server.Server{suite.srv})
suite.bootstrapServer(newHeader(suite.srv), suite.grpcPDClient)

suite.ctx, suite.clean = context.WithCancel(context.Background())
Expand Down Expand Up @@ -728,19 +728,6 @@ func (suite *clientTestSuite) TearDownSuite() {
suite.cleanup()
}

func (suite *clientTestSuite) mustWaitLeader(svrs map[string]*server.Server) *server.Server {
for i := 0; i < 500; i++ {
for _, s := range svrs {
if !s.IsClosed() && s.GetMember().IsLeader() {
return s
}
}
time.Sleep(100 * time.Millisecond)
}
suite.FailNow("no leader")
return nil
}

func newHeader(srv *server.Server) *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: srv.ClusterID(),
Expand Down
Loading

0 comments on commit 95d8400

Please sign in to comment.