Skip to content

Commit

Permalink
fix mq_flush_worker_test
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 17, 2022
1 parent 7e6336c commit 60ac05f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
12 changes: 9 additions & 3 deletions cdc/sink/mq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

package mq

// func TestMain(m *testing.M) {
// leakutil.SetUpLeakTest(m)
// }
import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
40 changes: 27 additions & 13 deletions cdc/sink/mq/mq_flush_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package mq

import (
"context"
"errors"
"math"
"sync"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
Expand Down Expand Up @@ -81,7 +82,7 @@ func NewMockProducer() *mockProducer {
}
}

func newTestWorker() (*flushWorker, *mockProducer) {
func newTestWorker(ctx context.Context) (*flushWorker, *mockProducer) {
// 200 is about the size of a row change.
encoderConfig := codec.NewConfig(config.ProtocolOpen, timeutil.SystemLocation()).
WithMaxMessageBytes(200)
Expand All @@ -95,14 +96,16 @@ func newTestWorker() (*flushWorker, *mockProducer) {
}
producer := NewMockProducer()
return newFlushWorker(encoder, producer,
metrics.NewStatistics(context.Background(), metrics.SinkTypeMQ)), producer
metrics.NewStatistics(ctx, metrics.SinkTypeMQ)), producer
}

//nolint:tparallel
func TestBatch(t *testing.T) {
t.Parallel()

worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, _ := newTestWorker(ctx)
key := topicPartitionKey{
topic: "test",
partition: 1,
Expand Down Expand Up @@ -163,7 +166,8 @@ func TestBatch(t *testing.T) {
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
// Indicates that this event is not expected to be processed
CommitTs: math.MaxUint64,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
Expand All @@ -175,7 +179,6 @@ func TestBatch(t *testing.T) {
}

var wg sync.WaitGroup
ctx := context.Background()
batch := make([]mqEvent, 3)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -191,8 +194,12 @@ func TestBatch(t *testing.T) {

go func() {
for _, event := range test.events {
err := worker.addEvent(context.Background(), event)
require.NoError(t, err)
err := worker.addEvent(ctx, event)
if event.row == nil || event.row.CommitTs != math.MaxUint64 {
require.NoError(t, err)
} else {
require.Regexp(t, ".*context canceled.*", err)
}
}
}()
wg.Wait()
Expand All @@ -215,7 +222,9 @@ func TestGroup(t *testing.T) {
topic: "test1",
partition: 2,
}
worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, _ := newTestWorker(ctx)

events := []mqEvent{
{
Expand Down Expand Up @@ -291,7 +300,9 @@ func TestAsyncSend(t *testing.T) {
partition: 3,
}

worker, producer := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, producer := newTestWorker(ctx)
events := []mqEvent{
{
row: &model.RowChangedEvent{
Expand Down Expand Up @@ -359,7 +370,10 @@ func TestFlush(t *testing.T) {
topic: "test",
partition: 1,
}
worker, producer := newTestWorker()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, producer := newTestWorker(ctx)

events := []mqEvent{
{
Expand Down Expand Up @@ -420,8 +434,8 @@ func TestFlush(t *testing.T) {
func TestAbort(t *testing.T) {
t.Parallel()

worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
worker, _ := newTestWorker(ctx)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -438,9 +452,9 @@ func TestAbort(t *testing.T) {
func TestProducerError(t *testing.T) {
t.Parallel()

worker, prod := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, prod := newTestWorker(ctx)

var wg sync.WaitGroup
wg.Add(1)
Expand Down

0 comments on commit 60ac05f

Please sign in to comment.