Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ticdc): migrate test-infra to testify for cdc/kv pkg (#2899) #4515

Merged
merged 7 commits into from
Feb 15, 2022
731 changes: 319 additions & 412 deletions cdc/kv/client_test.go

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions cdc/kv/grpc_pool_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,53 @@ package kv

import (
"context"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

// Use clientSuite for some special reasons, the embed etcd uses zap as the only candidate
// logger and in the logger initialization it also initializes the grpclog/loggerv2, which
// is not a thread-safe operation and it must be called before any gRPC functions
// ref: https://github.com/grpc/grpc-go/blob/master/grpclog/loggerv2.go#L67-L72
func (s *clientSuite) TestConnArray(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
func TestConnArray(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer pool.Close()
addr := "127.0.0.1:20161"
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(conn.active, check.Equals, int64(1))
require.Nil(t, err)
require.Equal(t, int64(1), conn.active)
pool.ReleaseConn(conn, addr)
c.Assert(conn.active, check.Equals, int64(0))
require.Equal(t, int64(0), conn.active)

lastConn := conn
// First grpcConnCapacity*2 connections will use initial two connections.
for i := 0; i < grpcConnCapacity*2; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn)
c.Assert(conn.active, check.Equals, int64(i)/2+1)
require.Nil(t, err)
require.NotSame(t, conn.ClientConn, lastConn.ClientConn)
require.Equal(t, int64(i)/2+1, conn.active)
lastConn = conn
}
// The following grpcConnCapacity*2 connections will trigger resize of connection array.
for i := 0; i < grpcConnCapacity*2; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn)
c.Assert(conn.active, check.Equals, int64(i)/2+1)
require.Nil(t, err)
require.NotSame(t, conn.ClientConn, lastConn.ClientConn)
require.Equal(t, int64(i)/2+1, conn.active)
lastConn = conn
}
}

func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
func TestConnArrayRecycle(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -75,7 +75,7 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
// get conn for 6000 times, and grpc pool will create 6 buckets
for i := 0; i < grpcConnCapacity*bucket; i++ {
conn, err := pool.GetConn(addr)
c.Assert(err, check.IsNil)
require.Nil(t, err)
if i%(grpcConnCapacity*resizeBucketStep) == 0 {
sharedConns[i/grpcConnCapacity] = conn
}
Expand All @@ -84,22 +84,22 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) {
}
}
for i := 2; i < bucket; i++ {
c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity))
require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active)
for j := 0; j < grpcConnCapacity; j++ {
pool.ReleaseConn(sharedConns[i], addr)
}
}
empty := pool.bucketConns[addr].recycle()
c.Assert(empty, check.IsFalse)
c.Assert(pool.bucketConns[addr].conns, check.HasLen, 2)
require.False(t, empty)
require.Len(t, pool.bucketConns[addr].conns, 2)

for i := 0; i < 2; i++ {
c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity))
require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active)
for j := 0; j < grpcConnCapacity; j++ {
pool.ReleaseConn(sharedConns[i], addr)
}
}
empty = pool.bucketConns[addr].recycle()
c.Assert(empty, check.IsTrue)
c.Assert(pool.bucketConns[addr].conns, check.HasLen, 0)
require.True(t, empty)
require.Len(t, pool.bucketConns[addr].conns, 0)
}
30 changes: 30 additions & 0 deletions cdc/kv/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/pingcap/tiflow/pkg/workerpool.(*worker).run"),
goleak.IgnoreTopFunction("sync.runtime_Semacquire"),
}

leakutil.SetUpLeakTest(m, opts...)
}
47 changes: 22 additions & 25 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
package kv

import (
"github.com/pingcap/check"
"testing"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type matcherSuite struct{}

var _ = check.Suite(&matcherSuite{})

func (s *matcherSuite) TestMatchRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -48,11 +45,11 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(ok, check.IsFalse)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.False(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
}, commitRow1)

// test match commit
commitRow2 := &cdcpb.Event_Row{
Expand All @@ -61,18 +58,18 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
Key: []byte("k1"),
}
ok = matcher.matchRow(commitRow2)
c.Assert(ok, check.IsTrue)
c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{
require.True(t, ok)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k1"),
Value: []byte("v2"),
OldValue: []byte("v3"),
})
}, commitRow2)
}

func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchFakePrewrite(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -93,20 +90,20 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v1"),
OldValue: []byte("v3"),
})
c.Assert(ok, check.IsTrue)
}, commitRow1)
require.True(t, ok)
}

func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
defer testleak.AfterTest(c)()
func TestMatchMatchCachedRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))
matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -122,7 +119,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
CommitTs: 5,
Key: []byte("k3"),
})
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
require.Equal(t, 0, len(matcher.matchCachedRow()))

matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
Expand Down Expand Up @@ -159,7 +156,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov3"),
})

c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{
require.Equal(t, []*cdcpb.Event_Row{{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Expand All @@ -171,5 +168,5 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
}})
}}, matcher.matchCachedRow())
}
51 changes: 21 additions & 30 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,26 @@ import (
"math/rand"
"runtime"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type regionWorkerSuite struct{}

var _ = check.Suite(&regionWorkerSuite{})

func (s *regionWorkerSuite) TestRegionStateManager(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManager(t *testing.T) {
rsm := newRegionStateManager(4)

regionID := uint64(1000)
_, ok := rsm.getState(regionID)
c.Assert(ok, check.IsFalse)
require.False(t, ok)

rsm.setState(regionID, &regionFeedState{requestID: 2})
state, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(state.requestID, check.Equals, uint64(2))
require.True(t, ok)
require.Equal(t, uint64(2), state.requestID)
}

func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerThreadSafe(t *testing.T) {
rsm := newRegionStateManager(4)
regionCount := 100
regionIDs := make([]uint64, regionCount)
Expand All @@ -62,9 +56,9 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
idx := rand.Intn(regionCount)
regionID := regionIDs[idx]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.RLock()
c.Assert(s.requestID, check.Equals, uint64(idx+1))
require.Equal(t, uint64(idx+1), s.requestID)
s.lock.RUnlock()
}
}()
Expand All @@ -79,7 +73,7 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
}
regionID := regionIDs[rand.Intn(regionCount)]
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
require.True(t, ok)
s.lock.Lock()
s.lastResolvedTs += 10
s.lock.Unlock()
Expand All @@ -92,29 +86,26 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) {
totalResolvedTs := uint64(0)
for _, regionID := range regionIDs {
s, ok := rsm.getState(regionID)
c.Assert(ok, check.IsTrue)
c.Assert(s.lastResolvedTs, check.Greater, uint64(1000))
require.True(t, ok)
require.Greater(t, s.lastResolvedTs, uint64(1000))
totalResolvedTs += s.lastResolvedTs
}
// 100 regions, initial resolved ts 1000;
// 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`.
c.Assert(totalResolvedTs, check.Equals, uint64(100*1000+2000*10*concurrency))
require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs)
}

func (s *regionWorkerSuite) TestRegionStateManagerBucket(c *check.C) {
defer testleak.AfterTest(c)()
func TestRegionStateManagerBucket(t *testing.T) {
rsm := newRegionStateManager(-1)
c.Assert(rsm.bucket, check.GreaterEqual, minRegionStateBucket)
c.Assert(rsm.bucket, check.LessEqual, maxRegionStateBucket)
require.GreaterOrEqual(t, rsm.bucket, minRegionStateBucket)
require.LessOrEqual(t, rsm.bucket, maxRegionStateBucket)

bucket := rsm.bucket * 2
rsm = newRegionStateManager(bucket)
c.Assert(rsm.bucket, check.Equals, bucket)
require.Equal(t, bucket, rsm.bucket)
}

func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
defer testleak.AfterTest(c)()

func TestRegionWorkerPoolSize(t *testing.T) {
conf := config.GetDefaultServerConfig()
conf.KVClient.WorkerPoolSize = 0
config.StoreGlobalServerConfig(conf)
Expand All @@ -125,13 +116,13 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
}
return b
}
c.Assert(size, check.Equals, min(runtime.NumCPU()*2, maxWorkerPoolSize))
require.Equal(t, min(runtime.NumCPU()*2, maxWorkerPoolSize), size)

conf.KVClient.WorkerPoolSize = 5
size = getWorkerPoolSize()
c.Assert(size, check.Equals, 5)
require.Equal(t, 5, size)

conf.KVClient.WorkerPoolSize = maxWorkerPoolSize + 1
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
require.Equal(t, maxWorkerPoolSize, size)
}
Loading