Skip to content

Commit

Permalink
*: use testutil.Eventually to replace testutil.WaitUntil (#5108)
Browse files Browse the repository at this point in the history
ref #5105

Use testutil.Eventually to replace testutil.WaitUntil.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
JmPotato and ti-chi-bot authored Jun 7, 2022
1 parent 12a9513 commit 147e9c0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 45 deletions.
6 changes: 3 additions & 3 deletions pkg/mock/mockhbstream/mockhbstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func TestActivity(t *testing.T) {

// Active stream is stream1.
hbs.BindStream(1, stream1)
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
// Rebind to stream2.
hbs.BindStream(1, stream2)
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() == nil && stream2.Recv() != nil
})
Expand All @@ -66,7 +66,7 @@ func TestActivity(t *testing.T) {
re.NotNil(res.GetHeader().GetError())
// Switch back to 1 again.
hbs.BindStream(1, stream1)
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
Expand Down
20 changes: 8 additions & 12 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package testutil
import (
"os"
"strings"
"testing"
"time"

"github.com/pingcap/check"
Expand Down Expand Up @@ -55,6 +54,7 @@ func WithSleepInterval(sleep time.Duration) WaitOption {
}

// WaitUntil repeatedly evaluates f() for a period of time, util it returns true.
// NOTICE: this function will be removed soon, please use `Eventually` instead.
func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
c.Log("wait start")
option := &WaitOp{
Expand All @@ -73,24 +73,20 @@ func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
c.Fatal("wait timeout")
}

// WaitUntilWithTestingT repeatedly evaluates f() for a period of time, util it returns true.
// NOTICE: this is a temporary function that we will be used to replace `WaitUntil` later.
func WaitUntilWithTestingT(t *testing.T, f CheckFunc, opts ...WaitOption) {
t.Log("wait start")
// Eventually asserts that given condition will be met in a period of time.
func Eventually(re *require.Assertions, condition func() bool, opts ...WaitOption) {
option := &WaitOp{
retryTimes: waitMaxRetry,
sleepInterval: waitRetrySleep,
}
for _, opt := range opts {
opt(option)
}
for i := 0; i < option.retryTimes; i++ {
if f() {
return
}
time.Sleep(option.sleepInterval)
}
t.Fatal("wait timeout")
re.Eventually(
condition,
option.sleepInterval*time.Duration(option.retryTimes),
option.sleepInterval,
)
}

// NewRequestHeader creates a new request header.
Expand Down
52 changes: 26 additions & 26 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestClientLeaderChange(t *testing.T) {
cli := setupCli(re, ctx, endpoints)

var ts1, ts2 uint64
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
p1, l1, err := cli.GetTS(context.TODO())
if err == nil {
ts1 = tsoutil.ComposeTS(p1, l1)
Expand All @@ -87,16 +87,16 @@ func TestClientLeaderChange(t *testing.T) {
re.True(cluster.CheckTSOUnique(ts1))

leader := cluster.GetLeader()
waitLeader(t, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)
waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)

err = cluster.GetServer(leader).Stop()
re.NoError(err)
leader = cluster.WaitLeader()
re.NotEmpty(leader)
waitLeader(t, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)
waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)

// Check TS won't fall back after leader changed.
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
p2, l2, err := cli.GetTS(context.TODO())
if err == nil {
ts2 = tsoutil.ComposeTS(p2, l2)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestLeaderTransfer(t *testing.T) {
cli := setupCli(re, ctx, endpoints)

var lastTS uint64
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestUpdateAfterResetTSO(t *testing.T) {
endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)

testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
Expand All @@ -200,7 +200,7 @@ func TestUpdateAfterResetTSO(t *testing.T) {
newLeaderName := cluster.WaitLeader()
re.NotEqual(oldLeaderName, newLeaderName)
// Request a new TSO.
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
Expand All @@ -209,7 +209,7 @@ func TestUpdateAfterResetTSO(t *testing.T) {
err = cluster.GetServer(newLeaderName).ResignLeader()
re.NoError(err)
// Should NOT panic here.
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
Expand All @@ -235,7 +235,7 @@ func TestTSOAllocatorLeader(t *testing.T) {

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitAllLeadersWithTestingT(t, dcLocationConfig)
cluster.WaitAllLeadersWithTestify(re, dcLocationConfig)

var (
testServers = cluster.GetServers()
Expand All @@ -249,7 +249,7 @@ func TestTSOAllocatorLeader(t *testing.T) {
var allocatorLeaderMap = make(map[string]string)
for _, dcLocation := range dcLocationConfig {
var pdName string
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
pdName = cluster.WaitAllocatorLeader(dcLocation)
return len(pdName) > 0
})
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NoError(err)
dcLocationConfig["pd4"] = "dc-4"
cluster.CheckClusterDCLocation()
cluster.WaitAllLeadersWithTestingT(t, dcLocationConfig)
cluster.WaitAllLeadersWithTestify(re, dcLocationConfig)

// Test a nonexistent dc-location for Local TSO
p, l, err := cli.GetLocalTS(context.TODO(), "nonexistent-dc")
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestGetTsoFromFollowerClient1(t *testing.T) {

re.Nil(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestGetTsoFromFollowerClient2(t *testing.T) {

re.Nil(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
var lastTS uint64
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -560,8 +560,8 @@ func setupCli(re *require.Assertions, ctx context.Context, endpoints []string, o
return cli
}

func waitLeader(t *testing.T, cli client, leader string) {
testutil.WaitUntilWithTestingT(t, func() bool {
func waitLeader(re *require.Assertions, cli client, leader string) {
testutil.Eventually(re, func() bool {
cli.ScheduleCheckLeader()
return cli.GetLeaderAddr() == leader
})
Expand Down Expand Up @@ -835,8 +835,8 @@ func (suite *clientTestSuite) TestGetRegion() {
}
err := suite.regionHeartbeat.Send(req)
suite.NoError(err)
t := suite.T()
testutil.WaitUntilWithTestingT(t, func() bool {
re := suite.Require()
testutil.Eventually(re, func() bool {
r, err := suite.client.GetRegion(context.Background(), []byte("a"))
suite.NoError(err)
if r == nil {
Expand Down Expand Up @@ -864,7 +864,7 @@ func (suite *clientTestSuite) TestGetRegion() {
},
}
suite.NoError(suite.reportBucket.Send(breq))
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets())
suite.NoError(err)
if r == nil {
Expand All @@ -874,7 +874,7 @@ func (suite *clientTestSuite) TestGetRegion() {
})
config := suite.srv.GetRaftCluster().GetStoreConfig()
config.EnableRegionBucket = false
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets())
suite.NoError(err)
if r == nil {
Expand Down Expand Up @@ -911,7 +911,7 @@ func (suite *clientTestSuite) TestGetPrevRegion() {
}
time.Sleep(500 * time.Millisecond)
for i := 0; i < 20; i++ {
testutil.WaitUntilWithTestingT(suite.T(), func() bool {
testutil.Eventually(suite.Require(), func() bool {
r, err := suite.client.GetPrevRegion(context.Background(), []byte{byte(i)})
suite.NoError(err)
if i > 0 && i < regionLen {
Expand Down Expand Up @@ -949,8 +949,7 @@ func (suite *clientTestSuite) TestScanRegions() {
}

// Wait for region heartbeats.
t := suite.T()
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(suite.Require(), func() bool {
scanRegions, err := suite.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})
Expand All @@ -967,6 +966,7 @@ func (suite *clientTestSuite) TestScanRegions() {
region5 := core.NewRegionInfo(regions[5], regions[5].Peers[0], core.WithPendingPeers([]*metapb.Peer{regions[5].Peers[1], regions[5].Peers[2]}))
suite.srv.GetRaftCluster().HandleRegionHeartbeat(region5)

t := suite.T()
check := func(start, end []byte, limit int, expect []*metapb.Region) {
scanRegions, err := suite.client.ScanRegions(context.Background(), start, end, limit)
suite.NoError(err)
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func (suite *clientTestSuite) TestGetRegionByID() {
err := suite.regionHeartbeat.Send(req)
suite.NoError(err)

testutil.WaitUntilWithTestingT(suite.T(), func() bool {
testutil.Eventually(suite.Require(), func() bool {
r, err := suite.client.GetRegionByID(context.Background(), regionID)
suite.NoError(err)
if r == nil {
Expand Down Expand Up @@ -1300,8 +1300,8 @@ func (suite *clientTestSuite) TestScatterRegion() {
regionsID := []uint64{regionID}
suite.NoError(err)
// Test interface `ScatterRegions`.
t := suite.T()
testutil.WaitUntilWithTestingT(t, func() bool {
re := suite.Require()
testutil.Eventually(re, func() bool {
scatterResp, err := suite.client.ScatterRegions(context.Background(), regionsID, pd.WithGroup("test"), pd.WithRetry(1))
if err != nil {
return false
Expand All @@ -1320,7 +1320,7 @@ func (suite *clientTestSuite) TestScatterRegion() {

// Test interface `ScatterRegion`.
// TODO: Deprecate interface `ScatterRegion`.
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
err := suite.client.ScatterRegion(context.Background(), regionID)
if err != nil {
fmt.Println(err)
Expand Down
8 changes: 4 additions & 4 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net/http"
"os"
"sync"
"testing"
"time"

"github.com/coreos/go-semver/semver"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/autoscaling"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -623,17 +623,17 @@ func (c *TestCluster) WaitAllLeaders(testC *check.C, dcLocations map[string]stri
wg.Wait()
}

// WaitAllLeadersWithTestingT will block and wait for the election of PD leader and all Local TSO Allocator leaders.
// WaitAllLeadersWithTestify will block and wait for the election of PD leader and all Local TSO Allocator leaders.
// NOTICE: this is a temporary function that we will be used to replace `WaitAllLeaders` later.
func (c *TestCluster) WaitAllLeadersWithTestingT(t *testing.T, dcLocations map[string]string) {
func (c *TestCluster) WaitAllLeadersWithTestify(re *require.Assertions, dcLocations map[string]string) {
c.WaitLeader()
c.CheckClusterDCLocation()
// Wait for each DC's Local TSO Allocator leader
wg := sync.WaitGroup{}
for _, dcLocation := range dcLocations {
wg.Add(1)
go func(dc string) {
testutil.WaitUntilWithTestingT(t, func() bool {
testutil.Eventually(re, func() bool {
leaderName := c.WaitAllocatorLeader(dc)
return leaderName != ""
})
Expand Down

0 comments on commit 147e9c0

Please sign in to comment.