Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: unify the must wait leader #5226

Merged
merged 3 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestAdminTestSuite(t *testing.T) {
func (suite *adminTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCheckerTestSuite(t *testing.T) {
func (suite *checkerTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/checker", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestClusterTestSuite(t *testing.T) {
func (suite *clusterTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (suite *configTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestHotStatusTestSuite(t *testing.T) {
func (suite *hotStatusTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/hotspot", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (suite *labelsStoreTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.StrictlyMatchLabel = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (suite *strictlyLabelsStoreTestSuite) SetupSuite() {
cfg.Replication.StrictlyMatchLabel = true
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

suite.grpcSvr = &server.GrpcServer{Server: suite.svr}
addr := suite.svr.GetAddr()
Expand Down
2 changes: 1 addition & 1 deletion server/api/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLogTestSuite(t *testing.T) {
func (suite *logTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/admin", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (suite *minResolvedTSTestSuite) SetupSuite() {
re := suite.Require()
cluster.DefaultMinResolvedTSPersistenceInterval = time.Microsecond
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (suite *operatorTestSuite) SetupSuite() {
re := suite.Require()
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedule/unexpectedOperator", "return(true)"))
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 1 })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (suite *transferRegionOperatorTestSuite) SetupSuite() {
re := suite.Require()
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedule/unexpectedOperator", "return(true)"))
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 3 })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestProfTestSuite(t *testing.T) {
func (suite *profTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/debug", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/region_label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRegionLabelTestSuite(t *testing.T) {
func (suite *regionLabelTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/config/region-label/", addr, apiPrefix)
Expand Down
8 changes: 4 additions & 4 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestRegionTestSuite(t *testing.T) {
func (suite *regionTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestGetRegionTestSuite(t *testing.T) {
func (suite *getRegionTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestGetRegionRangeHolesTestSuite(t *testing.T) {
func (suite *getRegionRangeHolesTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})
addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
mustBootstrapCluster(re, suite.svr)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestRegionsReplicatedTestSuite(t *testing.T) {
func (suite *regionsReplicatedTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestRuleTestSuite(t *testing.T) {
func (suite *ruleTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/config", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestScheduleTestSuite(t *testing.T) {
func (suite *scheduleTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/schedulers", addr, apiPrefix)
Expand Down
21 changes: 2 additions & 19 deletions server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
}
close(ch)
// wait etcd and http servers
mustWaitLeader(re, svrs)
server.MustWaitLeader(re, svrs)

// clean up
clean := func() {
Expand All @@ -122,23 +122,6 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
return cfgs, svrs, clean
}

func mustWaitLeader(re *require.Assertions, svrs []*server.Server) {
testutil.Eventually(re, func() bool {
var leader *pdpb.Member
for _, svr := range svrs {
l := svr.GetLeader()
// All servers' GetLeader should return the same leader.
if l == nil || (leader != nil && l.GetMemberId() != leader.GetMemberId()) {
return false
}
if leader == nil {
leader = l
}
}
return true
})
}

func mustBootstrapCluster(re *require.Assertions, s *server.Server) {
grpcPDClient := testutil.MustNewGrpcClient(re, s.GetAddr())
req := &pdpb.BootstrapRequest{
Expand All @@ -164,7 +147,7 @@ func TestServiceTestSuite(t *testing.T) {
func (suite *serviceTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

mustBootstrapCluster(re, suite.svr)
mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
Expand Down
2 changes: 1 addition & 1 deletion server/api/service_gc_safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestServiceGCSafepointTestSuite(t *testing.T) {
func (suite *serviceGCSafepointTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions server/api/service_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (suite *auditMiddlewareTestSuite) SetupSuite() {
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.Replication.EnablePlacementRules = false
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestRateLimitConfigTestSuite(t *testing.T) {
func (suite *rateLimitConfigTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})
mustBootstrapCluster(re, suite.svr)
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", suite.svr.GetAddr(), apiPrefix)
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestStatsTestSuite(t *testing.T) {
func (suite *statsTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *storeTestSuite) SetupSuite() {
// TODO: enable placmentrules
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.EnablePlacementRules = false })
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.grpcSvr = &server.GrpcServer{Server: suite.svr}
Expand Down
2 changes: 1 addition & 1 deletion server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTrend(t *testing.T) {
re := require.New(t)
svr, cleanup := mustNewServer(re)
defer cleanup()
mustWaitLeader(re, []*server.Server{svr})
server.MustWaitLeader(re, []*server.Server{svr})

mustBootstrapCluster(re, svr)
for i := 1; i <= 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion server/api/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (suite *tsoTestSuite) SetupSuite() {
cfg.EnableLocalTSO = true
cfg.Labels[config.ZoneLabel] = "dc-1"
})
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
Expand Down
2 changes: 1 addition & 1 deletion server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestUnsafeOperationTestSuite(t *testing.T) {
func (suite *unsafeOperationTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
mustWaitLeader(re, []*server.Server{suite.svr})
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/admin/unsafe", addr, apiPrefix)
Expand Down
16 changes: 1 addition & 15 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,6 @@ func TestLeaderServerTestSuite(t *testing.T) {
suite.Run(t, new(leaderServerTestSuite))
}

func (suite *leaderServerTestSuite) mustWaitLeader(svrs []*Server) *Server {
var leader *Server
testutil.Eventually(suite.Require(), func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.member.IsLeader() {
leader = s
return true
}
}
return false
})
return leader
}

func (suite *leaderServerTestSuite) SetupSuite() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.svrs = make(map[string]*Server)
Expand Down Expand Up @@ -125,7 +111,7 @@ func (suite *leaderServerTestSuite) newTestServersWithCfgs(ctx context.Context,
suite.NotNil(svr)
svrs = append(svrs, svr)
}
suite.mustWaitLeader(svrs)
MustWaitLeader(suite.Require(), svrs)

cleanup := func() {
for _, svr := range svrs {
Expand Down
20 changes: 20 additions & 0 deletions server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/assertutil"
"github.com/tikv/pd/pkg/tempurl"
"github.com/tikv/pd/pkg/testutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -115,3 +117,21 @@ func NewTestMultiConfig(c *assertutil.Checker, count int) []*config.Config {

return cfgs
}

// MustWaitLeader return the leader until timeout.
func MustWaitLeader(re *require.Assertions, svrs []*Server) *Server {
var leader *Server
testutil.Eventually(re, func() bool {
for _, svr := range svrs {
// All servers' GetLeader should return the same leader.
if svr.GetLeader() == nil || (leader != nil && svr.GetLeader().GetMemberId() != leader.GetLeader().GetMemberId()) {
return false
}
if leader == nil && !svr.IsClosed() {
leader = svr
}
}
return true
})
return leader
}
15 changes: 1 addition & 14 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (suite *clientTestSuite) SetupSuite() {
suite.grpcPDClient = testutil.MustNewGrpcClient(re, suite.srv.GetAddr())
suite.grpcSvr = &server.GrpcServer{Server: suite.srv}

suite.mustWaitLeader(map[string]*server.Server{suite.srv.GetAddr(): suite.srv})
server.MustWaitLeader(re, []*server.Server{suite.srv})
suite.bootstrapServer(newHeader(suite.srv), suite.grpcPDClient)

suite.ctx, suite.clean = context.WithCancel(context.Background())
Expand Down Expand Up @@ -728,19 +728,6 @@ func (suite *clientTestSuite) TearDownSuite() {
suite.cleanup()
}

func (suite *clientTestSuite) mustWaitLeader(svrs map[string]*server.Server) *server.Server {
for i := 0; i < 500; i++ {
for _, s := range svrs {
if !s.IsClosed() && s.GetMember().IsLeader() {
return s
}
}
time.Sleep(100 * time.Millisecond)
}
suite.FailNow("no leader")
return nil
}

func newHeader(srv *server.Server) *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: srv.ClusterID(),
Expand Down
Loading