diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 181de57e0e9..726bc3b007b 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -25,7 +25,6 @@ import ( "time" "github.com/golang/protobuf/proto" // nolint:staticcheck - "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" @@ -37,19 +36,15 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil" - "github.com/pingcap/tiflow/pkg/util" - "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" - "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -62,31 +57,11 @@ func Test(t *testing.T) { go func() { RunWorkerPool(context.Background()) //nolint:errcheck }() - check.TestingT(t) } -type clientSuite struct { - e *embed.Etcd -} - -var _ = check.Suite(&clientSuite{}) - -func (s *clientSuite) SetUpTest(c *check.C) { - dir := c.MkDir() - var err error - _, s.e, err = etcd.SetupEmbedEtcd(dir) - c.Assert(err, check.IsNil) -} - -func (s *clientSuite) TearDownTest(c *check.C) { - s.e.Close() -} - -func (s *clientSuite) TestNewClient(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestNewClient(t *testing.T) { rpcClient, _, pdClient, err := testutils.NewMockTiKV("", nil) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer pdClient.Close() defer rpcClient.Close() @@ -95,12 +70,10 @@ func (s *clientSuite) TestNewClient(c *check.C) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(), "") - c.Assert(cli, check.NotNil) + require.NotNil(t, cli) } -func (s *clientSuite) TestAssembleRowEvent(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestAssembleRowEvent(t *testing.T) { testCases := []struct { regionID uint64 entry *cdcpb.Event_Row @@ -209,24 +182,24 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { for _, tc := range testCases { event, err := assembleRowEvent(tc.regionID, tc.entry, tc.enableOldValue) - c.Assert(event, check.DeepEquals, tc.expected) + require.Equal(t, tc.expected, event) if err != nil { - c.Assert(err.Error(), check.Equals, tc.err) + require.Equal(t, tc.err, err.Error()) } } } type mockChangeDataService struct { - c *check.C + t *testing.T ch chan *cdcpb.ChangeDataEvent recvLoop func(server cdcpb.ChangeData_EventFeedServer) exitNotify sync.Map eventFeedID uint64 } -func newMockChangeDataService(c *check.C, ch chan *cdcpb.ChangeDataEvent) *mockChangeDataService { +func newMockChangeDataService(t *testing.T, ch chan *cdcpb.ChangeDataEvent) *mockChangeDataService { s := &mockChangeDataService{ - c: c, + t: t, ch: ch, } return s @@ -280,23 +253,23 @@ loop: func newMockService( ctx context.Context, - c *check.C, + t *testing.T, srv cdcpb.ChangeDataServer, wg *sync.WaitGroup, ) (grpcServer *grpc.Server, addr string) { - return newMockServiceSpecificAddr(ctx, c, srv, "127.0.0.1:0", wg) + return newMockServiceSpecificAddr(ctx, t, srv, "127.0.0.1:0", wg) } func newMockServiceSpecificAddr( ctx context.Context, - c *check.C, + t *testing.T, srv cdcpb.ChangeDataServer, listenAddr string, wg *sync.WaitGroup, ) (grpcServer *grpc.Server, addr string) { lc := &net.ListenConfig{} lis, err := lc.Listen(ctx, "tcp", listenAddr) - c.Assert(err, check.IsNil) + require.Nil(t, err) addr = lis.Addr().String() kaep := keepalive.EnforcementPolicy{ // force minimum ping interval @@ -318,13 +291,13 @@ func newMockServiceSpecificAddr( go func() { defer wg.Done() err := grpcServer.Serve(lis) - c.Assert(err, check.IsNil) + require.Nil(t, err) }() return } // waitRequestID waits request ID larger than the given allocated ID -func waitRequestID(c *check.C, allocatedID uint64) { +func waitRequestID(t *testing.T, allocatedID uint64) { err := retry.Do(context.Background(), func() error { if currentRequestID() > allocatedID { return nil @@ -332,18 +305,16 @@ func waitRequestID(c *check.C, allocatedID uint64) { return errors.Errorf("request id %d is not larger than %d", currentRequestID(), allocatedID) }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestConnectOfflineTiKV(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg := &sync.WaitGroup{} ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv := newMockChangeDataService(c, ch2) - server2, addr := newMockService(ctx, c, srv, wg) + srv := newMockChangeDataService(t, ch2) + server2, addr := newMockService(ctx, t, srv, wg) defer func() { close(ch2) server2.Stop() @@ -351,10 +322,10 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck invalidStore := "localhost:1" @@ -378,11 +349,11 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // new session, request to store 1, request to store 2 - waitRequestID(c, baseAllocatedID+2) + waitRequestID(t, baseAllocatedID+2) makeEvent := func(ts uint64) *cdcpb.ChangeDataEvent { return &cdcpb.ChangeDataEvent{ @@ -399,7 +370,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { } checkEvent := func(event model.RegionFeedEvent, ts uint64) { - c.Assert(event.Resolved.ResolvedTs, check.Equals, ts) + require.Equal(t, ts, event.Resolved.ResolvedTs) } initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) @@ -409,7 +380,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { ts, err := kvStorage.CurrentTimestamp(oracle.GlobalTxnScope) ver := kv.NewVersion(ts) - c.Assert(err, check.IsNil) + require.Nil(t, err) ch2 <- makeEvent(ver.Ver) var event model.RegionFeedEvent // consume the first resolved ts event, which is sent before region starts @@ -417,35 +388,33 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { select { case event = <-eventCh: case <-time.After(time.Second): - c.Fatalf("reconnection not succeed in 1 second") + require.FailNow(t, "reconnection not succeed in 1 second") } checkEvent(event, 1) select { case event = <-eventCh: case <-time.After(time.Second): - c.Fatalf("reconnection not succeed in 1 second") + require.FailNow(t, "reconnection not succeed in 1 second") } checkEvent(event, ver.Ver) // check gRPC connection active counter is updated correctly bucket, ok := grpcPool.bucketConns[invalidStore] - c.Assert(ok, check.IsTrue) + require.True(t, ok) empty := bucket.recycle() - c.Assert(empty, check.IsTrue) + require.True(t, empty) cancel() } // [NOTICE]: I concern this ut may cost too much time when resource limit -func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestRecvLargeMessageSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv := newMockChangeDataService(c, ch2) - server2, addr := newMockService(ctx, c, srv, wg) + srv := newMockChangeDataService(t, ch2) + server2, addr := newMockService(ctx, t, srv, wg) defer func() { close(ch2) server2.Stop() @@ -453,11 +422,11 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} defer pdClient.Close() //nolint:errcheck kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(2, addr) @@ -476,11 +445,11 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) ch2 <- initialized @@ -489,9 +458,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { select { case event = <-eventCh: case <-time.After(time.Second): - c.Fatalf("recving message takes too long") + require.FailNow(t, "recving message takes too long") } - c.Assert(event, check.NotNil) + require.NotNil(t, event) largeValSize := 128*1024*1024 + 1 // 128MB + 1 largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ @@ -515,25 +484,23 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { select { case event = <-eventCh: case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time. - c.Fatalf("receiving message takes too long") + require.FailNow(t, "receiving message takes too long") } - c.Assert(len(event.Val.Value), check.Equals, largeValSize) + require.Equal(t, largeValSize, len(event.Val.Value)) cancel() } -func (s *clientSuite) TestHandleError(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestHandleError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv2 := newMockChangeDataService(c, ch2) - server2, addr2 := newMockService(ctx, c, srv2, wg) + srv2 := newMockChangeDataService(t, ch2) + server2, addr2 := newMockService(ctx, t, srv2, wg) defer func() { close(ch1) @@ -544,10 +511,10 @@ func (s *clientSuite) TestHandleError(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck region3 := uint64(3) @@ -574,11 +541,11 @@ func (s *clientSuite) TestHandleError(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) var event model.RegionFeedEvent notLeader := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ @@ -602,7 +569,7 @@ func (s *clientSuite) TestHandleError(c *check.C) { // wait request id allocated with: // new session, no leader request, epoch not match request - waitRequestID(c, baseAllocatedID+2) + waitRequestID(t, baseAllocatedID+2) epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -616,7 +583,7 @@ func (s *clientSuite) TestHandleError(c *check.C) { }} ch2 <- epochNotMatch - waitRequestID(c, baseAllocatedID+3) + waitRequestID(t, baseAllocatedID+3) regionNotFound := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -630,7 +597,7 @@ func (s *clientSuite) TestHandleError(c *check.C) { }} ch2 <- regionNotFound - waitRequestID(c, baseAllocatedID+4) + waitRequestID(t, baseAllocatedID+4) unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -648,8 +615,8 @@ consumePreResolvedTs: for { select { case event = <-eventCh: - c.Assert(event.Resolved, check.NotNil) - c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(100)) + require.NotNil(t, event.Resolved) + require.Equal(t, uint64(100), event.Resolved.ResolvedTs) case <-time.After(time.Second): break consumePreResolvedTs } @@ -658,7 +625,7 @@ consumePreResolvedTs: // wait request id allocated with: // new session, no leader request, epoch not match request, // region not found request, unknown error request, normal request - waitRequestID(c, baseAllocatedID+5) + waitRequestID(t, baseAllocatedID+5) initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) ch2 <- initialized @@ -682,10 +649,10 @@ consumePreResolvedTs: select { case event = <-eventCh: case <-time.After(3 * time.Second): - c.Fatalf("reconnection not succeed in 3 seconds") + require.FailNow(t, "reconnection not succeed in 3 seconds") } - c.Assert(event.Resolved, check.NotNil) - c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(120)) + require.NotNil(t, event.Resolved) + require.Equal(t, uint64(120), event.Resolved.ResolvedTs) cancel() } @@ -693,15 +660,13 @@ consumePreResolvedTs: // TestCompatibilityWithSameConn tests kv client returns an error when TiKV returns // the Compatibility error. This error only happens when the same connection to // TiKV have different versions. -func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestCompatibilityWithSameConn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) server1.Stop() @@ -709,10 +674,10 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr1) @@ -732,11 +697,11 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { go func() { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) - c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue) + require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) incompatibility := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -757,16 +722,13 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { // TestClusterIDMismatch tests kv client returns an error when TiKV returns // the cluster ID mismatch error. -func (s *clientSuite) TestClusterIDMismatch(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestClusterIDMismatch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} changeDataCh := make(chan *cdcpb.ChangeDataEvent, 10) - changeDataService := newMockChangeDataService(c, changeDataCh) - mockService, addr := newMockService(ctx, c, changeDataService, wg) + changeDataService := newMockChangeDataService(t, changeDataCh) + mockService, addr := newMockService(ctx, t, changeDataService, wg) defer func() { close(changeDataCh) mockService.Stop() @@ -774,11 +736,11 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr) @@ -800,11 +762,11 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { go func() { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) - c.Assert(cerror.ErrClusterIDMismatch.Equal(err), check.IsTrue) + require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) clusterIDMismatchEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -826,14 +788,13 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { cancel() } -func (s *clientSuite) testHandleFeedEvent(c *check.C) { - defer s.TearDownTest(c) +func testHandleFeedEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -842,10 +803,10 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr1) @@ -864,11 +825,11 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) eventsBeforeInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ // before initialized, prewrite and commit could be in any sequence, @@ -1222,9 +1183,9 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { for _, expectedEv := range expected { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, expectedEv) + require.Equal(t, expectedEv, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", expectedEv) + require.Fail(t, fmt.Sprintf("expected event %v not received", expectedEv)) } } @@ -1232,22 +1193,20 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { for i := 0; i < multiSize; i++ { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, multipleExpected) + require.Equal(t, multipleExpected, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", multipleExpected) + require.Fail(t, fmt.Sprintf("expected event %v not received", multipleExpected)) } } cancel() } -func (s *clientSuite) TestHandleFeedEvent(c *check.C) { - defer testleak.AfterTest(c)() - s.testHandleFeedEvent(c) +func TestHandleFeedEvent(t *testing.T) { + testHandleFeedEvent(t) } -func (s *clientSuite) TestHandleFeedEventWithWorkerPool(c *check.C) { - defer testleak.AfterTest(c)() +func TestHandleFeedEventWithWorkerPool(t *testing.T) { hwm := regionWorkerHighWatermark lwm := regionWorkerLowWatermark regionWorkerHighWatermark = 8 @@ -1256,15 +1215,13 @@ func (s *clientSuite) TestHandleFeedEventWithWorkerPool(c *check.C) { regionWorkerHighWatermark = hwm regionWorkerLowWatermark = lwm }() - s.testHandleFeedEvent(c) + testHandleFeedEvent(t) } // TestStreamSendWithError mainly tests the scenario that the `Send` call of a gPRC // stream of kv client meets error, and kv client can clean up the broken stream, // establish a new one and recover the normal event feed processing. -func (s *clientSuite) TestStreamSendWithError(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestStreamSendWithError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} defer wg.Wait() @@ -1273,8 +1230,8 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { var server1StopFlag int32 server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { defer func() { // TiCDC may reestalish stream again, so we need to add failpoint-inject @@ -1296,10 +1253,10 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { } rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID3 := uint64(3) @@ -1320,13 +1277,13 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockerResolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() var requestIds sync.Map <-server1Stopped ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv2 := newMockChangeDataService(c, ch2) + srv2 := newMockChangeDataService(t, ch2) srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -1338,7 +1295,7 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { } } // Reuse the same listen address as server 1 - server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) + server2, _ := newMockServiceSpecificAddr(ctx, t, srv2, addr1, wg) defer func() { close(ch2) server2.Stop() @@ -1356,7 +1313,7 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { return errors.New("waiting for kv client requests received by server") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) reqID1, _ := requestIds.Load(regionID3) reqID2, _ := requestIds.Load(regionID4) initialized1 := mockInitializedEvent(regionID3, reqID1.(uint64)) @@ -1369,31 +1326,30 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { for i := 0; i < 2; i++ { select { case event := <-eventCh: - c.Assert(event.Resolved, check.NotNil) + require.NotNil(t, event.Resolved) initRegions[event.RegionID] = struct{}{} case <-time.After(time.Second): - c.Errorf("expected events are not receive, received: %v", initRegions) + require.Fail(t, fmt.Sprintf("expected events are not receive, received: %v", initRegions)) } } expectedInitRegions := map[uint64]struct{}{regionID3: {}, regionID4: {}} - c.Assert(initRegions, check.DeepEquals, expectedInitRegions) + require.Equal(t, expectedInitRegions, initRegions) // a hack way to check the goroutine count of region worker is 1 buf := make([]byte, 1<<20) stackLen := runtime.Stack(buf, true) stack := string(buf[:stackLen]) - c.Assert(strings.Count(stack, "resolveLock"), check.Equals, 1) - c.Assert(strings.Count(stack, "collectWorkpoolError"), check.Equals, 1) + require.Equal(t, 1, strings.Count(stack, "resolveLock")) + require.Equal(t, 1, strings.Count(stack, "collectWorkpoolError")) } -func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { - defer s.TearDownTest(c) +func testStreamRecvWithError(t *testing.T, failpointStr string) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -1402,10 +1358,10 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -1413,7 +1369,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", failpointStr) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") }() @@ -1430,11 +1386,11 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized1 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { @@ -1444,7 +1400,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { return errors.New("message is not sent") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) // another stream will be established, so we notify and wait the first // EventFeed loop exits. @@ -1452,11 +1408,11 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { select { case <-callback: case <-time.After(time.Second * 3): - c.Error("event feed loop can't exit") + require.Fail(t, "event feed loop can't exit") } // wait request id allocated with: new session, new request*2 - waitRequestID(c, baseAllocatedID+2) + waitRequestID(t, baseAllocatedID+2) initialized2 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized2 @@ -1499,24 +1455,22 @@ eventLoop: break eventLoop } } - c.Assert(events, check.DeepEquals, expected) + require.Equal(t, expected, events) cancel() } // TestStreamRecvWithErrorAndResolvedGoBack mainly tests the scenario that the `Recv` call of a gPRC // stream in kv client meets error, and kv client reconnection with tikv with the current tso -func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - if !util.FailpointBuild { - c.Skip("skip when this is not a failpoint build") - } +func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { + // if !util.FailpointBuild { + // c.Skip("skip when this is not a failpoint build") + // } ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} var requestID uint64 ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) + srv1 := newMockChangeDataService(t, ch1) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -1527,7 +1481,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { atomic.StoreUint64(&requestID, req.RequestId) } } - server1, addr1 := newMockService(ctx, c, srv1, wg) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -1536,10 +1490,10 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -1560,11 +1514,11 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { defer wg.Done() defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) err = retry.Do(context.Background(), func() error { if atomic.LoadUint64(&requestID) == currentRequestID() { return nil @@ -1573,7 +1527,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { atomic.LoadUint64(&requestID), currentRequestID()) }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) initialized1 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { @@ -1583,7 +1537,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { return errors.New("message is not sent") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -1600,9 +1554,9 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { return errors.New("message is not sent") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", "1*return(\"\")") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") }() @@ -1614,11 +1568,11 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { select { case <-callback: case <-time.After(time.Second * 3): - c.Error("event feed loop can't exit") + require.Fail(t, "event feed loop can't exit") } // wait request id allocated with: new session, new request*2 - waitRequestID(c, baseAllocatedID+2) + waitRequestID(t, baseAllocatedID+2) err = retry.Do(context.Background(), func() error { if atomic.LoadUint64(&requestID) == currentRequestID() { return nil @@ -1627,7 +1581,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { atomic.LoadUint64(&requestID), currentRequestID()) }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) initialized2 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized2 err = retry.Do(context.Background(), func() error { @@ -1637,7 +1591,7 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { return errors.New("message is not sent") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) resolved = &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -1662,13 +1616,13 @@ ReceiveLoop: break ReceiveLoop } case <-time.After(time.Second): - c.Errorf("event received timeout") + require.Fail(t, "event received timeout") } } var lastResolvedTs uint64 for _, e := range received { if lastResolvedTs > e.Resolved.ResolvedTs { - c.Errorf("the resolvedTs is back off %#v", resolved) + require.Fail(t, fmt.Sprintf("the resolvedTs is back off %#v", resolved)) } } } @@ -1676,19 +1630,16 @@ ReceiveLoop: // TestStreamRecvWithErrorNormal mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets a **logical related** error, and kv client // logs the error and re-establish new request. -func (s *clientSuite) TestStreamRecvWithErrorNormal(c *check.C) { - defer testleak.AfterTest(c)() - s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") +func TestStreamRecvWithErrorNormal(t *testing.T) { + testStreamRecvWithError(t, "1*return(\"injected stream recv error\")") } // TestStreamRecvWithErrorIOEOF mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets error io.EOF, and kv client logs the error // and re-establish new request -func (s *clientSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { - defer testleak.AfterTest(c)() - - s.testStreamRecvWithError(c, "1*return(\"EOF\")") - s.testStreamRecvWithError(c, "1*return(\"EOF\")") +func TestStreamRecvWithErrorIOEOF(t *testing.T) { + testStreamRecvWithError(t, "1*return(\"EOF\")") + testStreamRecvWithError(t, "1*return(\"EOF\")") } // TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible` @@ -1696,9 +1647,7 @@ func (s *clientSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { // TiCDC will wait 20s and then retry. This is a common scenario when rolling // upgrade a cluster and the new version is not compatible with the old version // (upgrade TiCDC before TiKV, since upgrade TiKV often takes much longer). -func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestIncompatibleTiKV(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -1721,7 +1670,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { var requestIds sync.Map ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) + srv1 := newMockChangeDataService(t, ch1) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -1732,7 +1681,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { requestIds.Store(req.RegionId, req.RequestId) } } - server1, addr1 := newMockService(ctx, c, srv1, wg) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -1741,10 +1690,10 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: gen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -1752,7 +1701,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientDelayWhenIncompatible", "return(true)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientDelayWhenIncompatible") }() @@ -1769,7 +1718,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() err = retry.Do(context.Background(), func() error { @@ -1779,7 +1728,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { return errors.Errorf("version generator is not updated in time, call time %d", atomic.LoadInt32(&call)) }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = retry.Do(context.Background(), func() error { _, ok := requestIds.Load(regionID) if ok { @@ -1788,16 +1737,16 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { return errors.New("waiting for kv client requests received by server") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) reqID, _ := requestIds.Load(regionID) initialized := mockInitializedEvent(regionID, reqID.(uint64)) ch1 <- initialized select { case event := <-eventCh: - c.Assert(event.Resolved, check.NotNil) - c.Assert(event.RegionID, check.Equals, regionID) + require.NotNil(t, event.Resolved) + require.Equal(t, regionID, event.RegionID) case <-time.After(time.Second): - c.Errorf("expected events are not receive") + require.Fail(t, "expected events are not receive") } cancel() @@ -1806,15 +1755,13 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { // TestPendingRegionError tests kv client should return an error when receiving // a new subscription (the first event of specific region) but the corresponding // region is not found in pending regions. -func (s *clientSuite) TestNoPendingRegionError(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestNoPendingRegionError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) server1.Stop() @@ -1822,10 +1769,10 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr1) @@ -1845,11 +1792,11 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) noPendingRegionEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -1862,8 +1809,8 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { initialized := mockInitializedEvent(3, currentRequestID()) ch1 <- initialized ev := <-eventCh - c.Assert(ev.Resolved, check.NotNil) - c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(100)) + require.NotNil(t, ev.Resolved) + require.Equal(t, uint64(100), ev.Resolved.ResolvedTs) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -1874,22 +1821,20 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { }} ch1 <- resolved ev = <-eventCh - c.Assert(ev.Resolved, check.NotNil) - c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(200)) + require.NotNil(t, ev.Resolved) + require.Equal(t, uint64(200), ev.Resolved.ResolvedTs) cancel() } // TestDropStaleRequest tests kv client should drop an event if its request id is outdated. -func (s *clientSuite) TestDropStaleRequest(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestDropStaleRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -1898,10 +1843,10 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -1921,11 +1866,11 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(regionID, currentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ @@ -1976,24 +1921,22 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { for _, expectedEv := range expected { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, expectedEv) + require.Equal(t, expectedEv, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", expectedEv) + require.Fail(t, fmt.Sprintf("expected event %v not received", expectedEv)) } } cancel() } // TestResolveLock tests the resolve lock logic in kv client -func (s *clientSuite) TestResolveLock(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestResolveLock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -2002,10 +1945,10 @@ func (s *clientSuite) TestResolveLock(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -2013,7 +1956,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) { cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientResolveLockInterval", "return(3)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientResolveLockInterval") }() @@ -2030,15 +1973,15 @@ func (s *clientSuite) TestResolveLock(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized physical, logical, err := pdClient.GetTS(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) tso := oracle.ComposeTS(physical, logical) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -2067,9 +2010,9 @@ func (s *clientSuite) TestResolveLock(c *check.C) { for _, expectedEv := range expected { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, expectedEv) + require.Equal(t, expectedEv, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", expectedEv) + require.Fail(t, fmt.Sprintf("expected event %v not received", expectedEv)) } } @@ -2080,14 +2023,13 @@ func (s *clientSuite) TestResolveLock(c *check.C) { cancel() } -func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.ChangeDataEvent) { - defer s.TearDownTest(c) +func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -2096,10 +2038,10 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -2113,7 +2055,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan // This inject will make regionWorker exit directly and trigger execution line cancel when meet error err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientErrUnreachable", "return(true)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientErrUnreachable") }() @@ -2131,11 +2073,11 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan go func() { defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(err, check.Equals, errUnreachable) + require.Equal(t, errUnreachable, err) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) for _, event := range events { for _, ev := range event.Events { ev.RequestId = currentRequestID() @@ -2147,8 +2089,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan } // TestCommittedFallback tests kv client should panic when receiving a fallback committed event -func (s *clientSuite) TestCommittedFallback(c *check.C) { - defer testleak.AfterTest(c)() +func TestCommittedFallback(t *testing.T) { events := []*cdcpb.ChangeDataEvent{ {Events: []*cdcpb.Event{ { @@ -2169,12 +2110,11 @@ func (s *clientSuite) TestCommittedFallback(c *check.C) { }, }}, } - s.testEventCommitTsFallback(c, events) + testEventCommitTsFallback(t, events) } // TestCommitFallback tests kv client should panic when receiving a fallback commit event -func (s *clientSuite) TestCommitFallback(c *check.C) { - defer testleak.AfterTest(c)() +func TestCommitFallback(t *testing.T) { events := []*cdcpb.ChangeDataEvent{ mockInitializedEvent(3, currentRequestID()), {Events: []*cdcpb.Event{ @@ -2195,12 +2135,11 @@ func (s *clientSuite) TestCommitFallback(c *check.C) { }, }}, } - s.testEventCommitTsFallback(c, events) + testEventCommitTsFallback(t, events) } // TestDeuplicateRequest tests kv client should panic when meeting a duplicate error -func (s *clientSuite) TestDuplicateRequest(c *check.C) { - defer testleak.AfterTest(c)() +func TestDuplicateRequest(t *testing.T) { events := []*cdcpb.ChangeDataEvent{ {Events: []*cdcpb.Event{ { @@ -2214,23 +2153,21 @@ func (s *clientSuite) TestDuplicateRequest(c *check.C) { }, }}, } - s.testEventCommitTsFallback(c, events) + testEventCommitTsFallback(t, events) } // testEventAfterFeedStop tests kv client can drop events sent after region feed is stopped // TODO: testEventAfterFeedStop is not stable, re-enable it after it is stable // nolint:unused -func (s *clientSuite) testEventAfterFeedStop(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func testEventAfterFeedStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} var server1StopFlag int32 server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { defer func() { // TiCDC may reestalish stream again, so we need to add failpoint-inject @@ -2253,10 +2190,10 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { } rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -2267,7 +2204,7 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { // before event feed processor is reconstruct, some duplicated events are // sent to event feed processor. err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientSingleFeedProcessDelay", "1*sleep(2000)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientSingleFeedProcessDelay") }() @@ -2284,11 +2221,11 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) // an error event will mark the corresponding region feed as stopped epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -2342,10 +2279,10 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { var requestID uint64 ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv2 := newMockChangeDataService(c, ch2) + srv2 := newMockChangeDataService(t, ch2) // Reuse the same listen addresss as server 1 to simulate TiKV handles the // gRPC stream terminate and reconnect. - server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) + server2, _ := newMockServiceSpecificAddr(ctx, t, srv2, addr1, wg) srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -2370,7 +2307,7 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { }, retry.WithMaxTries(10), retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000)) log.Info("retry check request id", zap.Error(err)) - c.Assert(err, check.IsNil) + require.Nil(t, err) // wait request id allocated with: new session, 2 * new request committedClone.Events[0].RequestId = currentRequestID() @@ -2416,23 +2353,21 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { for _, expectedEv := range expected { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, expectedEv) + require.Equal(t, expectedEv, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", expectedEv) + require.Fail(t, fmt.Sprintf("expected event %v not received", expectedEv)) } } cancel() } -func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestOutOfRegionRangeEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -2441,10 +2376,10 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr1) @@ -2463,11 +2398,11 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) eventsBeforeInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ // will be filtered out @@ -2624,9 +2559,9 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { for _, expectedEv := range expected { select { case event := <-eventCh: - c.Assert(event, check.DeepEquals, expectedEv) + require.Equal(t, expectedEv, event) case <-time.After(time.Second): - c.Errorf("expected event %v not received", expectedEv) + require.Fail(t, fmt.Sprintf("expected event %v not received", expectedEv)) } } @@ -2635,15 +2570,13 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { // TestResolveLockNoCandidate tests the resolved ts manager can work normally // when no region exceeds resolve lock interval, that is what candidate means. -func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestResolveLockNoCandidate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -2652,10 +2585,10 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -2677,11 +2610,11 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized @@ -2691,7 +2624,7 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { defer wg2.Done() for i := 0; i < 6; i++ { physical, logical, err := pdClient.GetTS(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) tso := oracle.ComposeTS(physical, logical) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -2703,9 +2636,9 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { ch1 <- resolved select { case event := <-eventCh: - c.Assert(event.Resolved, check.NotNil) + require.NotNil(t, event.Resolved) case <-time.After(time.Second): - c.Error("resolved event not received") + require.Fail(t, "resolved event not received") } // will sleep 6s totally, to ensure resolve lock fired once time.Sleep(time.Second) @@ -2723,15 +2656,13 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { // 2. We delay the kv client to re-create a new region request by 500ms via failpoint. // 3. Before new region request is fired, simulate kv client `stream.Recv` returns an error, the stream // handler will signal region worker to exit, which will evict all active region states then. -func (s *clientSuite) TestFailRegionReentrant(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestFailRegionReentrant(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -2740,10 +2671,10 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -2751,9 +2682,9 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantError", "1*return(\"ok\")->1*return(\"error\")") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantErrorDelay", "sleep(500)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantError") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantErrorDelay") @@ -2771,11 +2702,11 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, @@ -2808,22 +2739,19 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { // has been deleted in step-3, so it will create new stream but fails because // of unstable TiKV store, at this point, the kv client should handle with the // pending region correctly. -func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestClientV1UnlockRangeReentrant(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID3 := uint64(3) @@ -2833,9 +2761,9 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientPendingRegionDelay") @@ -2852,7 +2780,7 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait the second region is scheduled @@ -2867,26 +2795,23 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { // TestClientErrNoPendingRegion has the similar procedure with TestClientV1UnlockRangeReentrant // The difference is the delay injected point for region 2 -func (s *clientSuite) TestClientErrNoPendingRegion(c *check.C) { - defer testleak.AfterTest(c)() - s.testClientErrNoPendingRegion(c) +func TestClientErrNoPendingRegion(t *testing.T) { + testClientErrNoPendingRegion(t) } -func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { - defer s.TearDownTest(c) - +func testClientErrNoPendingRegion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID3 := uint64(3) @@ -2896,11 +2821,11 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", "1*return(\"injected error\")") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->2*sleep(1000)") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamCloseDelay", "sleep(2000)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientPendingRegionDelay") @@ -2918,16 +2843,16 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() baseAllocatedID := currentRequestID() // wait the second region is scheduled time.Sleep(time.Millisecond * 500) - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(regionID3, currentRequestID()) ch1 <- initialized - waitRequestID(c, baseAllocatedID+2) + waitRequestID(t, baseAllocatedID+2) initialized = mockInitializedEvent(regionID4, currentRequestID()) ch1 <- initialized // wait the kvClientPendingRegionDelay ends, and the second region is processed @@ -2939,15 +2864,15 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { } // TestKVClientForceReconnect force reconnect gRPC stream can work -func (s *clientSuite) testKVClientForceReconnect(c *check.C) { +func testKVClientForceReconnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} var server1StopFlag int32 server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { defer func() { // There may be a gap between server.Recv error and ticdc stream reconnect, so we need to add failpoint-inject @@ -2970,10 +2895,10 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { } rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID3 := uint64(3) @@ -2992,11 +2917,11 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() baseAllocatedID := currentRequestID() - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) initialized := mockInitializedEvent(regionID3, currentRequestID()) ch1 <- initialized @@ -3005,7 +2930,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { var requestIds sync.Map ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv2 := newMockChangeDataService(c, ch2) + srv2 := newMockChangeDataService(t, ch2) srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -3018,7 +2943,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { } // Reuse the same listen addresss as server 1 to simulate TiKV handles the // gRPC stream terminate and reconnect. - server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) + server2, _ := newMockServiceSpecificAddr(ctx, t, srv2, addr1, wg) defer func() { close(ch2) server2.Stop() @@ -3036,7 +2961,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { return errors.New("waiting for kv client requests received by server") }, retry.WithBackoffBaseDelay(300), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) requestID, _ := requestIds.Load(regionID3) initialized = mockInitializedEvent(regionID3, requestID.(uint64)) @@ -3066,35 +2991,30 @@ eventLoop: if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) { continue } - c.Assert(ev, check.DeepEquals, expected) + require.Equal(t, expected, ev) break eventLoop case <-time.After(time.Second): - c.Errorf("expected event %v not received", expected) + require.Fail(t, fmt.Sprintf("expected event %v not received", expected)) } } cancel() } -func (s *clientSuite) TestKVClientForceReconnect(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - - s.testKVClientForceReconnect(c) +func TestKVClientForceReconnect(t *testing.T) { + testKVClientForceReconnect(t) } // TestConcurrentProcessRangeRequest when region range request channel is full, // the kv client can process it correctly without deadlock. This is more likely // to happen when region split and merge frequently and large stale requests exist. -func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestConcurrentProcessRangeRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} requestIDs := new(sync.Map) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) + srv1 := newMockChangeDataService(t, ch1) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -3104,7 +3024,7 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { requestIDs.Store(req.RegionId, req.RequestId) } } - server1, addr1 := newMockService(ctx, c, srv1, wg) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -3113,10 +3033,10 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck storeID := uint64(1) @@ -3126,7 +3046,7 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID) err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientMockRangeLock", "1*return(20)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientMockRangeLock") }() @@ -3142,7 +3062,7 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // the kv client is blocked by failpoint injection, and after region has split @@ -3169,7 +3089,7 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { return errors.Errorf("region number %d is not as expected %d", count, regionNum) }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) - c.Assert(err, check.IsNil) + require.Nil(t, err) // send initialized event and a resolved ts event to each region requestIDs.Range(func(key, value interface{}) bool { @@ -3199,7 +3119,7 @@ checkEvent: break checkEvent } case <-time.After(time.Second): - c.Errorf("no more events received") + require.Fail(t, "no more events received") } } @@ -3209,15 +3129,13 @@ checkEvent: // TestEvTimeUpdate creates a new event feed, send N committed events every 100ms, // use failpoint to set reconnect interval to 1s, the last event time of region // should be updated correctly and no reconnect is triggered -func (s *clientSuite) TestEvTimeUpdate(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestEvTimeUpdate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { close(ch1) @@ -3226,10 +3144,10 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { }() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck cluster.AddStore(1, addr1) @@ -3238,7 +3156,7 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { originalReconnectInterval := reconnectInterval reconnectInterval = 1500 * time.Millisecond err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientCheckUnInitRegionInterval", "return(2)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientCheckUnInitRegionInterval") reconnectInterval = originalReconnectInterval @@ -3257,11 +3175,11 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) eventCount := 20 for i := 0; i < eventCount; i++ { @@ -3312,12 +3230,12 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { select { case event := <-eventCh: if i == 0 { - c.Assert(event, check.DeepEquals, expected[0]) + require.Equal(t, expected[0], event) } else { - c.Assert(event, check.DeepEquals, expected[1]) + require.Equal(t, expected[1], event) } case <-time.After(time.Second): - c.Errorf("expected event not received, %d received", i) + require.Fail(t, fmt.Sprintf("expected event not received, %d received", i)) } } @@ -3327,18 +3245,15 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { // TestRegionWorkerExitWhenIsIdle tests region worker can exit, and cancel gRPC // stream automatically when it is idle. // Idle means having no any effective region state -func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestRegionWorkerExitWhenIsIdle(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) defer close(ch1) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) defer server1.Stop() srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { defer func() { @@ -3355,10 +3270,10 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { } rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck regionID := uint64(3) @@ -3378,11 +3293,11 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // wait request id allocated with: new session, new request - waitRequestID(c, baseAllocatedID+1) + waitRequestID(t, baseAllocatedID+1) // an error event will mark the corresponding region feed as stopped epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -3400,7 +3315,7 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { select { case <-server1Stopped: case <-time.After(time.Second): - c.Error("stream is not terminated by cdc kv client") + require.Fail(t, "stream is not terminated by cdc kv client") } cancel() } @@ -3410,10 +3325,7 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { // TiCDC catches this error and resets the gRPC stream. TiCDC must not send a // new request before closing gRPC stream since currently there is no mechanism // to release an existing region connection. -func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestPrewriteNotMatchError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -3421,8 +3333,8 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { var server1Stopped int32 = 0 server1StoppedCh := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) - srv1 := newMockChangeDataService(c, ch1) - server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1 := newMockChangeDataService(t, ch1) + server1, addr1 := newMockService(ctx, t, srv1, wg) srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { if atomic.LoadInt32(&server1Stopped) == int32(1) { return @@ -3444,10 +3356,10 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { } rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer kvStorage.Close() //nolint:errcheck // create two regions to avoid the stream is canceled by no region remained @@ -3471,7 +3383,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { go func() { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockResolver, isPullInit, eventCh) - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) }() // The expected request ids are agnostic because the kv client could retry @@ -3486,7 +3398,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { return errors.New("waiting for kv client requests received by server") }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) - c.Assert(err, check.IsNil) + require.Nil(t, err) reqID1, _ := requestIds.Load(regionID3) reqID2, _ := requestIds.Load(regionID4) initialized1 := mockInitializedEvent(regionID3, reqID1.(uint64)) @@ -3516,7 +3428,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { <-server1StoppedCh ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - srv2 := newMockChangeDataService(c, ch2) + srv2 := newMockChangeDataService(t, ch2) srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { for { req, err := server.Recv() @@ -3528,7 +3440,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { } } // Reuse the same listen address as server 1 - server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) + server2, _ := newMockServiceSpecificAddr(ctx, t, srv2, addr1, wg) defer func() { close(ch2) server2.Stop() @@ -3537,7 +3449,7 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { // After the gRPC stream is canceled, two more reqeusts will be sent, so the // allocated id is increased by 2 from baseAllocatedID+2. - waitRequestID(c, baseAllocatedID+4) + waitRequestID(t, baseAllocatedID+4) cancel() } @@ -3552,10 +3464,7 @@ func createFakeEventFeedSession(ctx context.Context) *eventFeedSession { nil /*eventCh*/) } -func (s *clientSuite) TestCheckRateLimit(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestCheckRateLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -3571,18 +3480,15 @@ func (s *clientSuite) TestCheckRateLimit(c *check.C) { } } if trigger == maxTrigger { - c.Error("get rate limiter too slow") + require.Fail(t, "get rate limiter too slow") } - c.Assert(trigger, check.GreaterEqual, burst) + require.GreaterOrEqual(t, trigger, burst) time.Sleep(100 * time.Millisecond) allowed := session.checkRateLimit(1) - c.Assert(allowed, check.IsTrue) + require.True(t, allowed) } -func (s *clientSuite) TestHandleRateLimit(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func TestHandleRateLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -3590,18 +3496,18 @@ func (s *clientSuite) TestHandleRateLimit(c *check.C) { // empty rate limit item, do nothing session.handleRateLimit(ctx) - c.Assert(session.rateLimitQueue, check.HasLen, 0) - c.Assert(cap(session.rateLimitQueue), check.Equals, defaultRegionRateLimitQueueSize) + require.Len(t, session.rateLimitQueue, 0) + require.Equal(t, defaultRegionRateLimitQueueSize, cap(session.rateLimitQueue)) for i := 0; i < defaultRegionRateLimitQueueSize+1; i++ { session.rateLimitQueue = append(session.rateLimitQueue, regionErrorInfo{}) } session.handleRateLimit(ctx) - c.Assert(session.rateLimitQueue, check.HasLen, 1) - c.Assert(cap(session.rateLimitQueue), check.Equals, 1) + require.Len(t, session.rateLimitQueue, 1) + require.Equal(t, 1, cap(session.rateLimitQueue)) session.handleRateLimit(ctx) - c.Assert(session.rateLimitQueue, check.HasLen, 0) - c.Assert(cap(session.rateLimitQueue), check.Equals, 128) + require.Len(t, session.rateLimitQueue, 0) + require.Equal(t, 128, cap(session.rateLimitQueue)) } func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) { diff --git a/cdc/kv/grpc_pool_impl_test.go b/cdc/kv/grpc_pool_impl_test.go index 70e8b7413bb..4b3e34cb232 100644 --- a/cdc/kv/grpc_pool_impl_test.go +++ b/cdc/kv/grpc_pool_impl_test.go @@ -15,19 +15,19 @@ package kv import ( "context" + "testing" - "github.com/pingcap/check" "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" ) // Use clientSuite for some special reasons, the embed etcd uses zap as the only candidate // logger and in the logger initialization it also initializes the grpclog/loggerv2, which // is not a thread-safe operation and it must be called before any gRPC functions // ref: https://github.com/grpc/grpc-go/blob/master/grpclog/loggerv2.go#L67-L72 -func (s *clientSuite) TestConnArray(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestConnArray(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -35,33 +35,33 @@ func (s *clientSuite) TestConnArray(c *check.C) { defer pool.Close() addr := "127.0.0.1:20161" conn, err := pool.GetConn(addr) - c.Assert(err, check.IsNil) - c.Assert(conn.active, check.Equals, int64(1)) + require.Nil(t, err) + require.Equal(t, int64(1), conn.active) pool.ReleaseConn(conn, addr) - c.Assert(conn.active, check.Equals, int64(0)) + require.Equal(t, int64(0), conn.active) lastConn := conn // First grpcConnCapacity*2 connections will use initial two connections. for i := 0; i < grpcConnCapacity*2; i++ { conn, err := pool.GetConn(addr) - c.Assert(err, check.IsNil) - c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn) - c.Assert(conn.active, check.Equals, int64(i)/2+1) + require.Nil(t, err) + require.NotSame(t, conn.ClientConn, lastConn.ClientConn) + require.Equal(t, int64(i)/2+1, conn.active) lastConn = conn } // The following grpcConnCapacity*2 connections will trigger resize of connection array. for i := 0; i < grpcConnCapacity*2; i++ { conn, err := pool.GetConn(addr) - c.Assert(err, check.IsNil) - c.Assert(lastConn.ClientConn, check.Not(check.Equals), conn.ClientConn) - c.Assert(conn.active, check.Equals, int64(i)/2+1) + require.Nil(t, err) + require.NotSame(t, conn.ClientConn, lastConn.ClientConn) + require.Equal(t, int64(i)/2+1, conn.active) lastConn = conn } } -func (s *clientSuite) TestConnArrayRecycle(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestConnArrayRecycle(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -75,7 +75,7 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) { // get conn for 6000 times, and grpc pool will create 6 buckets for i := 0; i < grpcConnCapacity*bucket; i++ { conn, err := pool.GetConn(addr) - c.Assert(err, check.IsNil) + require.Nil(t, err) if i%(grpcConnCapacity*resizeBucketStep) == 0 { sharedConns[i/grpcConnCapacity] = conn } @@ -84,22 +84,22 @@ func (s *clientSuite) TestConnArrayRecycle(c *check.C) { } } for i := 2; i < bucket; i++ { - c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity)) + require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active) for j := 0; j < grpcConnCapacity; j++ { pool.ReleaseConn(sharedConns[i], addr) } } empty := pool.bucketConns[addr].recycle() - c.Assert(empty, check.IsFalse) - c.Assert(pool.bucketConns[addr].conns, check.HasLen, 2) + require.False(t, empty) + require.Len(t, pool.bucketConns[addr].conns, 2) for i := 0; i < 2; i++ { - c.Assert(sharedConns[i].active, check.Equals, int64(grpcConnCapacity)) + require.Equal(t, int64(grpcConnCapacity), sharedConns[i].active) for j := 0; j < grpcConnCapacity; j++ { pool.ReleaseConn(sharedConns[i], addr) } } empty = pool.bucketConns[addr].recycle() - c.Assert(empty, check.IsTrue) - c.Assert(pool.bucketConns[addr].conns, check.HasLen, 0) + require.True(t, empty) + require.Len(t, pool.bucketConns[addr].conns, 0) } diff --git a/cdc/kv/main_test.go b/cdc/kv/main_test.go new file mode 100644 index 00000000000..9fb556a0982 --- /dev/null +++ b/cdc/kv/main_test.go @@ -0,0 +1,30 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/pingcap/tiflow/pkg/workerpool.(*worker).run"), + goleak.IgnoreTopFunction("sync.runtime_Semacquire"), + } + + leakutil.SetUpLeakTest(m, opts...) +} diff --git a/cdc/kv/matcher_test.go b/cdc/kv/matcher_test.go index fd9f745e7aa..8270f314261 100644 --- a/cdc/kv/matcher_test.go +++ b/cdc/kv/matcher_test.go @@ -14,17 +14,14 @@ package kv import ( - "github.com/pingcap/check" + "testing" + "github.com/pingcap/kvproto/pkg/cdcpb" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" ) -type matcherSuite struct{} - -var _ = check.Suite(&matcherSuite{}) - -func (s *matcherSuite) TestMatchRow(c *check.C) { - defer testleak.AfterTest(c)() +func TestMatchRow(t *testing.T) { + t.Parallel() matcher := newMatcher() matcher.putPrewriteRow(&cdcpb.Event_Row{ StartTs: 1, @@ -48,11 +45,11 @@ func (s *matcherSuite) TestMatchRow(c *check.C) { Key: []byte("k1"), } ok := matcher.matchRow(commitRow1) - c.Assert(ok, check.IsFalse) - c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{ + require.False(t, ok) + require.Equal(t, &cdcpb.Event_Row{ StartTs: 1, Key: []byte("k1"), - }) + }, commitRow1) // test match commit commitRow2 := &cdcpb.Event_Row{ @@ -61,18 +58,18 @@ func (s *matcherSuite) TestMatchRow(c *check.C) { Key: []byte("k1"), } ok = matcher.matchRow(commitRow2) - c.Assert(ok, check.IsTrue) - c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{ + require.True(t, ok) + require.Equal(t, &cdcpb.Event_Row{ StartTs: 2, CommitTs: 3, Key: []byte("k1"), Value: []byte("v2"), OldValue: []byte("v3"), - }) + }, commitRow2) } -func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) { - defer testleak.AfterTest(c)() +func TestMatchFakePrewrite(t *testing.T) { + t.Parallel() matcher := newMatcher() matcher.putPrewriteRow(&cdcpb.Event_Row{ StartTs: 1, @@ -93,20 +90,20 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) { Key: []byte("k1"), } ok := matcher.matchRow(commitRow1) - c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{ + require.Equal(t, &cdcpb.Event_Row{ StartTs: 1, CommitTs: 2, Key: []byte("k1"), Value: []byte("v1"), OldValue: []byte("v3"), - }) - c.Assert(ok, check.IsTrue) + }, commitRow1) + require.True(t, ok) } -func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) { - defer testleak.AfterTest(c)() +func TestMatchMatchCachedRow(t *testing.T) { + t.Parallel() matcher := newMatcher() - c.Assert(len(matcher.matchCachedRow()), check.Equals, 0) + require.Equal(t, 0, len(matcher.matchCachedRow())) matcher.cacheCommitRow(&cdcpb.Event_Row{ StartTs: 1, CommitTs: 2, @@ -122,7 +119,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) { CommitTs: 5, Key: []byte("k3"), }) - c.Assert(len(matcher.matchCachedRow()), check.Equals, 0) + require.Equal(t, 0, len(matcher.matchCachedRow())) matcher.cacheCommitRow(&cdcpb.Event_Row{ StartTs: 1, @@ -159,7 +156,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) { OldValue: []byte("ov3"), }) - c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{ + require.Equal(t, []*cdcpb.Event_Row{{ StartTs: 1, CommitTs: 2, Key: []byte("k1"), @@ -171,5 +168,5 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) { Key: []byte("k2"), Value: []byte("v2"), OldValue: []byte("ov2"), - }}) + }}, matcher.matchCachedRow()) } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 13eac0d4ca7..0ec3d27c290 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -17,32 +17,26 @@ import ( "math/rand" "runtime" "sync" + "testing" - "github.com/pingcap/check" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" ) -type regionWorkerSuite struct{} - -var _ = check.Suite(®ionWorkerSuite{}) - -func (s *regionWorkerSuite) TestRegionStateManager(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionStateManager(t *testing.T) { rsm := newRegionStateManager(4) regionID := uint64(1000) _, ok := rsm.getState(regionID) - c.Assert(ok, check.IsFalse) + require.False(t, ok) rsm.setState(regionID, ®ionFeedState{requestID: 2}) state, ok := rsm.getState(regionID) - c.Assert(ok, check.IsTrue) - c.Assert(state.requestID, check.Equals, uint64(2)) + require.True(t, ok) + require.Equal(t, uint64(2), state.requestID) } -func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionStateManagerThreadSafe(t *testing.T) { rsm := newRegionStateManager(4) regionCount := 100 regionIDs := make([]uint64, regionCount) @@ -62,9 +56,9 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) { idx := rand.Intn(regionCount) regionID := regionIDs[idx] s, ok := rsm.getState(regionID) - c.Assert(ok, check.IsTrue) + require.True(t, ok) s.lock.RLock() - c.Assert(s.requestID, check.Equals, uint64(idx+1)) + require.Equal(t, uint64(idx+1), s.requestID) s.lock.RUnlock() } }() @@ -79,7 +73,7 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) { } regionID := regionIDs[rand.Intn(regionCount)] s, ok := rsm.getState(regionID) - c.Assert(ok, check.IsTrue) + require.True(t, ok) s.lock.Lock() s.lastResolvedTs += 10 s.lock.Unlock() @@ -92,29 +86,26 @@ func (s *regionWorkerSuite) TestRegionStateManagerThreadSafe(c *check.C) { totalResolvedTs := uint64(0) for _, regionID := range regionIDs { s, ok := rsm.getState(regionID) - c.Assert(ok, check.IsTrue) - c.Assert(s.lastResolvedTs, check.Greater, uint64(1000)) + require.True(t, ok) + require.Greater(t, s.lastResolvedTs, uint64(1000)) totalResolvedTs += s.lastResolvedTs } // 100 regions, initial resolved ts 1000; // 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`. - c.Assert(totalResolvedTs, check.Equals, uint64(100*1000+2000*10*concurrency)) + require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs) } -func (s *regionWorkerSuite) TestRegionStateManagerBucket(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionStateManagerBucket(t *testing.T) { rsm := newRegionStateManager(-1) - c.Assert(rsm.bucket, check.GreaterEqual, minRegionStateBucket) - c.Assert(rsm.bucket, check.LessEqual, maxRegionStateBucket) + require.GreaterOrEqual(t, rsm.bucket, minRegionStateBucket) + require.LessOrEqual(t, rsm.bucket, maxRegionStateBucket) bucket := rsm.bucket * 2 rsm = newRegionStateManager(bucket) - c.Assert(rsm.bucket, check.Equals, bucket) + require.Equal(t, bucket, rsm.bucket) } -func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) { - defer testleak.AfterTest(c)() - +func TestRegionWorkerPoolSize(t *testing.T) { conf := config.GetDefaultServerConfig() conf.KVClient.WorkerPoolSize = 0 config.StoreGlobalServerConfig(conf) @@ -125,13 +116,13 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) { } return b } - c.Assert(size, check.Equals, min(runtime.NumCPU()*2, maxWorkerPoolSize)) + require.Equal(t, min(runtime.NumCPU()*2, maxWorkerPoolSize), size) conf.KVClient.WorkerPoolSize = 5 size = getWorkerPoolSize() - c.Assert(size, check.Equals, 5) + require.Equal(t, 5, size) conf.KVClient.WorkerPoolSize = maxWorkerPoolSize + 1 size = getWorkerPoolSize() - c.Assert(size, check.Equals, maxWorkerPoolSize) + require.Equal(t, maxWorkerPoolSize, size) } diff --git a/cdc/kv/resolvedts_heap_test.go b/cdc/kv/resolvedts_heap_test.go index 4fe92e9b5f1..e687a4f7a2b 100644 --- a/cdc/kv/resolvedts_heap_test.go +++ b/cdc/kv/resolvedts_heap_test.go @@ -14,25 +14,22 @@ package kv import ( + "testing" "time" - "github.com/pingcap/check" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" ) -type rtsHeapSuite struct{} - -var _ = check.Suite(&rtsHeapSuite{}) - -func checkRegionTsInfoWithoutEvTime(c *check.C, obtained, expected *regionTsInfo) { - c.Assert(obtained.regionID, check.Equals, expected.regionID) - c.Assert(obtained.index, check.Equals, expected.index) - c.Assert(obtained.ts.resolvedTs, check.Equals, expected.ts.resolvedTs) - c.Assert(obtained.ts.sortByEvTime, check.IsFalse) +func checkRegionTsInfoWithoutEvTime(t *testing.T, obtained, expected *regionTsInfo) { + t.Parallel() + require.Equal(t, expected.regionID, obtained.regionID) + require.Equal(t, expected.index, obtained.index) + require.Equal(t, expected.ts.resolvedTs, obtained.ts.resolvedTs) + require.False(t, obtained.ts.sortByEvTime) } -func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionTsManagerResolvedTs(t *testing.T) { + t.Parallel() mgr := newRegionTsManager() initRegions := []*regionTsInfo{ {regionID: 102, ts: newResolvedTsItem(1040)}, @@ -42,14 +39,14 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) { for _, rts := range initRegions { mgr.Upsert(rts) } - c.Assert(mgr.Len(), check.Equals, 3) + require.Equal(t, 3, mgr.Len()) rts := mgr.Pop() - checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) + checkRegionTsInfoWithoutEvTime(t, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) // resolved ts is not updated mgr.Upsert(rts) rts = mgr.Pop() - checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) + checkRegionTsInfoWithoutEvTime(t, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) // resolved ts updated rts.ts.resolvedTs = 1001 @@ -57,17 +54,17 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) { mgr.Upsert(®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1100)}) rts = mgr.Pop() - checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 101, ts: newResolvedTsItem(1020), index: -1}) + checkRegionTsInfoWithoutEvTime(t, rts, ®ionTsInfo{regionID: 101, ts: newResolvedTsItem(1020), index: -1}) rts = mgr.Pop() - checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 102, ts: newResolvedTsItem(1040), index: -1}) + checkRegionTsInfoWithoutEvTime(t, rts, ®ionTsInfo{regionID: 102, ts: newResolvedTsItem(1040), index: -1}) rts = mgr.Pop() - checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1100), index: -1}) + checkRegionTsInfoWithoutEvTime(t, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1100), index: -1}) rts = mgr.Pop() - c.Assert(rts, check.IsNil) + require.Nil(t, rts) } -func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionTsManagerPenalty(t *testing.T) { + t.Parallel() mgr := newRegionTsManager() initRegions := []*regionTsInfo{ {regionID: 100, ts: newResolvedTsItem(1000)}, @@ -75,7 +72,7 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) { for _, rts := range initRegions { mgr.Upsert(rts) } - c.Assert(mgr.Len(), check.Equals, 1) + require.Equal(t, 1, mgr.Len()) // test penalty increases if resolved ts keeps unchanged for i := 0; i < 6; i++ { @@ -83,20 +80,20 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) { mgr.Upsert(rts) } rts := mgr.Pop() - c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000)) - c.Assert(rts.ts.penalty, check.Equals, 6) + require.Equal(t, uint64(1000), rts.ts.resolvedTs) + require.Equal(t, 6, rts.ts.penalty) // test penalty is cleared to zero if resolved ts is advanced mgr.Upsert(rts) rtsNew := ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)} mgr.Upsert(rtsNew) rts = mgr.Pop() - c.Assert(rts.ts.penalty, check.DeepEquals, 0) - c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000)) + require.Equal(t, 0, rts.ts.penalty) + require.Equal(t, uint64(2000), rts.ts.resolvedTs) } -func (s *rtsHeapSuite) TestRegionTsManagerPenaltyForFallBackEvent(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionTsManagerPenaltyForFallBackEvent(t *testing.T) { + t.Parallel() mgr := newRegionTsManager() initRegions := []*regionTsInfo{ {regionID: 100, ts: newResolvedTsItem(1000)}, @@ -104,7 +101,7 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenaltyForFallBackEvent(c *check.C) { for _, rts := range initRegions { mgr.Upsert(rts) } - c.Assert(mgr.Len(), check.Equals, 1) + require.Equal(t, 1, mgr.Len()) // test penalty increases if we meet a fallback event for i := 0; i < 6; i++ { @@ -113,20 +110,20 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenaltyForFallBackEvent(c *check.C) { } rts := mgr.Pop() // original resolvedTs will remain unchanged - c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000)) - c.Assert(rts.ts.penalty, check.Equals, 6) + require.Equal(t, uint64(1000), rts.ts.resolvedTs) + require.Equal(t, 6, rts.ts.penalty) // test penalty is cleared to zero if resolved ts is advanced mgr.Upsert(rts) rtsNew := ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)} mgr.Upsert(rtsNew) rts = mgr.Pop() - c.Assert(rts.ts.penalty, check.DeepEquals, 0) - c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000)) + require.Equal(t, 0, rts.ts.penalty) + require.Equal(t, uint64(2000), rts.ts.resolvedTs) } -func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) { - defer testleak.AfterTest(c)() +func TestRegionTsManagerEvTime(t *testing.T) { + t.Parallel() mgr := newRegionTsManager() initRegions := []*regionTsInfo{ {regionID: 100, ts: newEventTimeItem()}, @@ -136,14 +133,14 @@ func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) { mgr.Upsert(item) } info := mgr.Remove(101) - c.Assert(info.regionID, check.Equals, uint64(101)) + require.Equal(t, uint64(101), info.regionID) ts := time.Now() mgr.Upsert(®ionTsInfo{regionID: 100, ts: newEventTimeItem()}) info = mgr.Pop() - c.Assert(info.regionID, check.Equals, uint64(100)) - c.Assert(ts.Before(info.ts.eventTime), check.IsTrue) - c.Assert(time.Now().After(info.ts.eventTime), check.IsTrue) + require.Equal(t, uint64(100), info.regionID) + require.True(t, ts.Before(info.ts.eventTime)) + require.True(t, time.Now().After(info.ts.eventTime)) info = mgr.Pop() - c.Assert(info, check.IsNil) + require.Nil(t, info) } diff --git a/cdc/kv/store_op.go b/cdc/kv/store_op.go index cce8c6c40dd..22771feff10 100644 --- a/cdc/kv/store_op.go +++ b/cdc/kv/store_op.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/errors" tidbconfig "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/kv" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store" @@ -36,7 +35,7 @@ func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) { } // CreateTiStore creates a new tikv storage client -func CreateTiStore(urls string, credential *security.Credential) (kv.Storage, error) { +func CreateTiStore(urls string, credential *security.Credential) (tidbkv.Storage, error) { urlv, err := flags.NewURLsValue(urls) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go index a5ad7d5279e..4e99c985727 100644 --- a/cdc/kv/token_region_test.go +++ b/cdc/kv/token_region_test.go @@ -17,21 +17,17 @@ import ( "context" "fmt" "sync/atomic" + "testing" "time" - "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" "golang.org/x/sync/errgroup" ) -type tokenRegionSuite struct{} - -var _ = check.Suite(&tokenRegionSuite{}) - -func (s *tokenRegionSuite) TestRouter(c *check.C) { - defer testleak.AfterTest(c)() +func TestRouter(t *testing.T) { + t.Parallel() store := "store-1" limit := 10 r := NewSizedRegionRouter(context.Background(), limit) @@ -43,31 +39,31 @@ func (s *tokenRegionSuite) TestRouter(c *check.C) { for i := 0; i < limit; i++ { select { case sri := <-r.Chan(): - c.Assert(sri.ts, check.Equals, uint64(i)) + require.Equal(t, uint64(i), sri.ts) r.Acquire(store) regions = append(regions, sri) default: - c.Error("expect region info from router") + t.Error("expect region info from router") } } - c.Assert(r.tokens[store], check.Equals, limit) + require.Equal(t, limit, r.tokens[store]) for range regions { r.Release(store) } - c.Assert(r.tokens[store], check.Equals, 0) + require.Equal(t, 0, r.tokens[store]) } -func (s *tokenRegionSuite) TestRouterWithFastConsumer(c *check.C) { - defer testleak.AfterTest(c)() - s.testRouterWithConsumer(c, func() {}) +func TestRouterWithFastConsumer(t *testing.T) { + t.Parallel() + testRouterWithConsumer(t, func() {}) } -func (s *tokenRegionSuite) TestRouterWithSlowConsumer(c *check.C) { - defer testleak.AfterTest(c)() - s.testRouterWithConsumer(c, func() { time.Sleep(time.Millisecond * 15) }) +func TestRouterWithSlowConsumer(t *testing.T) { + t.Parallel() + testRouterWithConsumer(t, func() { time.Sleep(time.Millisecond * 15) }) } -func (s *tokenRegionSuite) testRouterWithConsumer(c *check.C, funcDoSth func()) { +func testRouterWithConsumer(t *testing.T, funcDoSth func()) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -118,13 +114,12 @@ func (s *tokenRegionSuite) testRouterWithConsumer(c *check.C, funcDoSth func()) } err := wg.Wait() - c.Assert(errors.Cause(err), check.Equals, context.Canceled) - c.Assert(r.tokens[store], check.Equals, 0) + require.Equal(t, context.Canceled, errors.Cause(err)) + require.Equal(t, 0, r.tokens[store]) } -func (s *tokenRegionSuite) TestRouterWithMultiStores(c *check.C) { - defer testleak.AfterTest(c)() - +func TestRouterWithMultiStores(t *testing.T) { + t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -173,8 +168,8 @@ func (s *tokenRegionSuite) TestRouterWithMultiStores(c *check.C) { } err := wg.Wait() - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) for _, store := range stores { - c.Assert(r.tokens[store], check.Equals, 0) + require.Equal(t, 0, r.tokens[store]) } }