Skip to content

Commit

Permalink
test(ticdc): migrate test-infra to testify for cdc/kv pkg (pingcap#2899)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 8, 2022
1 parent 95e751f commit 06b2f20
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 119 deletions.
46 changes: 22 additions & 24 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
package kv

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type matcherSuite struct{}

var _ = check.Suite(&matcherSuite{})

func (s *matcherSuite) TestMatchRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchRow(t *testing.T) {
defer testleak.AfterTestT(t)
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -48,11 +46,11 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(ok, check.IsFalse)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.False(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
}, commitRow1)

// test match commit
commitRow2 := &cdcpb.Event_Row{
Expand All @@ -61,18 +59,18 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok = matcher.matchRow(commitRow2)
c.Assert(ok, check.IsTrue)
c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{
require.True(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k1"),
Value: []byte("v2"),
OldValue: []byte("v3"),
})
}, commitRow2)
}

func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchFakePrewrite(t *testing.T) {
defer testleak.AfterTestT(t)
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -93,20 +91,20 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v1"),
OldValue: []byte("v3"),
})
c.Assert(ok, check.IsTrue)
}, commitRow1)
require.True(t, ok)
}

func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchMatchCachedRow(t *testing.T) {
defer testleak.AfterTestT(t)
matcher := newMatcher()
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))
matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -122,7 +120,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
CommitTs: 5,
Key: []byte("k3"),
})
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))

matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
Expand Down Expand Up @@ -159,7 +157,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov3"),
})

c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{
require.Equal(t, []*cdcpb.Event_Row{{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Expand All @@ -171,5 +169,5 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
}})
}}, matcher.matchCachedRow())
}
54 changes: 25 additions & 29 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@ import (
"math/rand"
"runtime"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type regionWorkerSuite struct{}

var _ = check.Suite(&regionWorkerSuite{})

func (s *regionWorkerSuite) TestRegionStateManager(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManager(t *testing.T) {
defer testleak.AfterTestT(t)
rsm := newRegionStateManager(4)

regionID := uint64(1000)
_, ok := rsm.getState(regionID)
c.Assert(ok, check.IsFalse)
require.False(t, ok)

rsm.setState(regionID, &regionFeedState{requestID: 2})
state, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(state.requestID, check.Equals, uint64(2))
require.True(t, ok)
require.Equal(t, uint64(2), state.requestID)
}

func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerThreadSafe(t *testing.T) {
defer testleak.AfterTestT(t)
rsm := newRegionStateManager(4)
regionCount := 100
regionIDs := make([]uint64, regionCount)
Expand All @@ -62,9 +59,9 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
idx := rand.Intn(regionCount)
regionID := regionIDs[idx]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.RLock()
c.Assert(s.requestID, check.Equals, uint64(idx+1))
require.Equal(t, uint64(idx+1), s.requestID)
s.lock.RUnlock()
}
}()
Expand All @@ -79,7 +76,7 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
}
regionID := regionIDs[rand.Intn(regionCount)]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.Lock()
s.lastResolvedTs += 10
s.lock.Unlock()
Expand All @@ -92,29 +89,28 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
totalResolvedTs := uint64(0)
for _, regionID := range regionIDs {
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(s.lastResolvedTs, check.Greater, uint64(1000))
require.True(t, ok)
require.Greater(t, s.lastResolvedTs, uint64(1000))
totalResolvedTs += s.lastResolvedTs
}
// 100 regions, initial resolved ts 1000;
// 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`.
c.Assert(totalResolvedTs, check.Equals, uint64(100*1000+2000*10*concurrency))
require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs)
}

func (s *regionWorkerSuite) TestRegionStateManagerBucket(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerBucket(t *testing.T) {
defer testleak.AfterTestT(t)
rsm := newRegionStateManager(-1)
c.Assert(rsm.bucket, check.GreaterEqual, minRegionStateBucket)
c.Assert(rsm.bucket, check.LessEqual, maxRegionStateBucket)
require.GreaterOrEqual(t, rsm.bucket, minRegionStateBucket)
require.LessOrEqual(t, rsm.bucket, maxRegionStateBucket)

bucket := rsm.bucket * 2
rsm = newRegionStateManager(bucket)
c.Assert(rsm.bucket, check.Equals, bucket)
require.Equal(t, bucket, rsm.bucket)
}

func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
defer testleak.AfterTest(c)()

func TestRegionWorkerPoolSize(t *testing.T) {
defer testleak.AfterTestT(t)
conf := config.GetDefaultServerConfig()
conf.KVClient.WorkerPoolSize = 0
config.StoreGlobalServerConfig(conf)
Expand All @@ -125,13 +121,13 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
}
return b
}
c.Assert(size, check.Equals, min(runtime.NumCPU()*2, maxWorkerPoolSize))
require.Equal(t, min(runtime.NumCPU()*2, maxWorkerPoolSize), size)

conf.KVClient.WorkerPoolSize = 5
size = getWorkerPoolSize()
c.Assert(size, check.Equals, 5)
require.Equal(t, 5, size)

conf.KVClient.WorkerPoolSize = maxWorkerPoolSize + 1
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
require.Equal(t, maxWorkerPoolSize, size)
}
Loading

0 comments on commit 06b2f20

Please sign in to comment.