Skip to content

Commit

Permalink
test(ticdc): migrate test-infra to testify for cdc/kv pkg (#2899) (#4515
Browse files Browse the repository at this point in the history
)

close #2899
  • Loading branch information
CharlesCheung96 authored Feb 15, 2022
1 parent 28fe713 commit 9ddc713
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 560 deletions.
731 changes: 319 additions & 412 deletions cdc/kv/client_test.go

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions cdc/kv/grpc_pool_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,53 @@ package kv

import (
"context"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

// Use clientSuite for some special reasons, the embed etcd uses zap as the only candidate
// logger and in the logger initialization it also initializes the grpclog/loggerv2, which
// is not a thread-safe operation and it must be called before any gRPC functions
// ref: https://github.com/grpc/grpc-go/blob/master/grpclog/loggerv2.go#L67-L72
func (s *clientSuite) TestConnArray(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
func TestConnArray(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer pool.Close()
addr := "127.0.0.1:20161"
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(conn.active, check.Equals, int64(1))
require.Nil(t, err)
require.Equal(t, int64(1), conn.active)
pool.ReleaseConn(conn, addr)
c.Assert(conn.active, check.Equals, int64(0))
require.Equal(t, int64(0), conn.active)

lastConn := conn
// First grpcConnCapacity*2 connections will use initial two connections.
for i := 0; i < grpcConnCapacity*2; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn)
c.Assert(conn.active, check.Equals, int64(i)/2+1)
require.Nil(t, err)
require.NotSame(t, conn.ClientConn, lastConn.ClientConn)
require.Equal(t, int64(i)/2+1, conn.active)
lastConn = conn
}
// The following grpcConnCapacity*2 connections will trigger resize of connection array.
for i := 0; i < grpcConnCapacity*2; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn)
c.Assert(conn.active, check.Equals, int64(i)/2+1)
require.Nil(t, err)
require.NotSame(t, conn.ClientConn, lastConn.ClientConn)
require.Equal(t, int64(i)/2+1, conn.active)
lastConn = conn
}
}

func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
func TestConnArrayRecycle(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -75,7 +75,7 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
// get conn for 6000 times, and grpc pool will create 6 buckets
for i := 0; i < grpcConnCapacity*bucket; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
require.Nil(t, err)
if i%(grpcConnCapacity*resizeBucketStep) == 0 {
sharedConns[i/grpcConnCapacity] = conn
}
Expand All @@ -84,22 +84,22 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
}
}
for i := 2; i < bucket; i++ {
c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity))
require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active)
for j := 0; j < grpcConnCapacity; j++ {
pool.ReleaseConn(sharedConns[i], addr)
}
}
empty := pool.bucketConns[addr].recycle()
c.Assert(empty, check.IsFalse)
c.Assert(pool.bucketConns[addr].conns, check.HasLen, 2)
require.False(t, empty)
require.Len(t, pool.bucketConns[addr].conns, 2)

for i := 0; i < 2; i++ {
c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity))
require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active)
for j := 0; j < grpcConnCapacity; j++ {
pool.ReleaseConn(sharedConns[i], addr)
}
}
empty = pool.bucketConns[addr].recycle()
c.Assert(empty, check.IsTrue)
c.Assert(pool.bucketConns[addr].conns, check.HasLen, 0)
require.True(t, empty)
require.Len(t, pool.bucketConns[addr].conns, 0)
}
30 changes: 30 additions & 0 deletions cdc/kv/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/pingcap/tiflow/pkg/workerpool.(*worker).run"),
goleak.IgnoreTopFunction("sync.runtime_Semacquire"),
}

leakutil.SetUpLeakTest(m, opts...)
}
47 changes: 22 additions & 25 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
package kv

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type matcherSuite struct{}

var _ = check.Suite(&matcherSuite{})

func (s *matcherSuite) TestMatchRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -48,11 +45,11 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(ok, check.IsFalse)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.False(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
}, commitRow1)

// test match commit
commitRow2 := &cdcpb.Event_Row{
Expand All @@ -61,18 +58,18 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok = matcher.matchRow(commitRow2)
c.Assert(ok, check.IsTrue)
c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{
require.True(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k1"),
Value: []byte("v2"),
OldValue: []byte("v3"),
})
}, commitRow2)
}

func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchFakePrewrite(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -93,20 +90,20 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v1"),
OldValue: []byte("v3"),
})
c.Assert(ok, check.IsTrue)
}, commitRow1)
require.True(t, ok)
}

func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchMatchCachedRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))
matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -122,7 +119,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
CommitTs: 5,
Key: []byte("k3"),
})
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))

matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
Expand Down Expand Up @@ -159,7 +156,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov3"),
})

c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{
require.Equal(t, []*cdcpb.Event_Row{{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Expand All @@ -171,5 +168,5 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
}})
}}, matcher.matchCachedRow())
}
51 changes: 21 additions & 30 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,26 @@ import (
"math/rand"
"runtime"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type regionWorkerSuite struct{}

var _ = check.Suite(&regionWorkerSuite{})

func (s *regionWorkerSuite) TestRegionStateManager(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManager(t *testing.T) {
rsm := newRegionStateManager(4)

regionID := uint64(1000)
_, ok := rsm.getState(regionID)
c.Assert(ok, check.IsFalse)
require.False(t, ok)

rsm.setState(regionID, &regionFeedState{requestID: 2})
state, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(state.requestID, check.Equals, uint64(2))
require.True(t, ok)
require.Equal(t, uint64(2), state.requestID)
}

func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerThreadSafe(t *testing.T) {
rsm := newRegionStateManager(4)
regionCount := 100
regionIDs := make([]uint64, regionCount)
Expand All @@ -62,9 +56,9 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
idx := rand.Intn(regionCount)
regionID := regionIDs[idx]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.RLock()
c.Assert(s.requestID, check.Equals, uint64(idx+1))
require.Equal(t, uint64(idx+1), s.requestID)
s.lock.RUnlock()
}
}()
Expand All @@ -79,7 +73,7 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
}
regionID := regionIDs[rand.Intn(regionCount)]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.Lock()
s.lastResolvedTs += 10
s.lock.Unlock()
Expand All @@ -92,29 +86,26 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
totalResolvedTs := uint64(0)
for _, regionID := range regionIDs {
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(s.lastResolvedTs, check.Greater, uint64(1000))
require.True(t, ok)
require.Greater(t, s.lastResolvedTs, uint64(1000))
totalResolvedTs += s.lastResolvedTs
}
// 100 regions, initial resolved ts 1000;
// 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`.
c.Assert(totalResolvedTs, check.Equals, uint64(100*1000+2000*10*concurrency))
require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs)
}

func (s *regionWorkerSuite) TestRegionStateManagerBucket(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerBucket(t *testing.T) {
rsm := newRegionStateManager(-1)
c.Assert(rsm.bucket, check.GreaterEqual, minRegionStateBucket)
c.Assert(rsm.bucket, check.LessEqual, maxRegionStateBucket)
require.GreaterOrEqual(t, rsm.bucket, minRegionStateBucket)
require.LessOrEqual(t, rsm.bucket, maxRegionStateBucket)

bucket := rsm.bucket * 2
rsm = newRegionStateManager(bucket)
c.Assert(rsm.bucket, check.Equals, bucket)
require.Equal(t, bucket, rsm.bucket)
}

func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
defer testleak.AfterTest(c)()

func TestRegionWorkerPoolSize(t *testing.T) {
conf := config.GetDefaultServerConfig()
conf.KVClient.WorkerPoolSize = 0
config.StoreGlobalServerConfig(conf)
Expand All @@ -125,13 +116,13 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
}
return b
}
c.Assert(size, check.Equals, min(runtime.NumCPU()*2, maxWorkerPoolSize))
require.Equal(t, min(runtime.NumCPU()*2, maxWorkerPoolSize), size)

conf.KVClient.WorkerPoolSize = 5
size = getWorkerPoolSize()
c.Assert(size, check.Equals, 5)
require.Equal(t, 5, size)

conf.KVClient.WorkerPoolSize = maxWorkerPoolSize + 1
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
require.Equal(t, maxWorkerPoolSize, size)
}
Loading

0 comments on commit 9ddc713

Please sign in to comment.