diff --git a/cdc/sink/mq/main_test.go b/cdc/sink/mq/main_test.go index 099e067b96f..ecdd0528eeb 100644 --- a/cdc/sink/mq/main_test.go +++ b/cdc/sink/mq/main_test.go @@ -13,6 +13,12 @@ package mq -// func TestMain(m *testing.M) { -// leakutil.SetUpLeakTest(m) -// } +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index a2a410beb76..224fc796769 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -15,10 +15,11 @@ package mq import ( "context" + "errors" + "math" "sync" "testing" - "github.com/pingcap/errors" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec" @@ -81,7 +82,7 @@ func NewMockProducer() *mockProducer { } } -func newTestWorker() (*flushWorker, *mockProducer) { +func newTestWorker(ctx context.Context) (*flushWorker, *mockProducer) { // 200 is about the size of a row change. encoderConfig := codec.NewConfig(config.ProtocolOpen, timeutil.SystemLocation()). WithMaxMessageBytes(200) @@ -95,14 +96,16 @@ func newTestWorker() (*flushWorker, *mockProducer) { } producer := NewMockProducer() return newFlushWorker(encoder, producer, - metrics.NewStatistics(context.Background(), metrics.SinkTypeMQ)), producer + metrics.NewStatistics(ctx, metrics.SinkTypeMQ)), producer } //nolint:tparallel func TestBatch(t *testing.T) { t.Parallel() - worker, _ := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, _ := newTestWorker(ctx) key := topicPartitionKey{ topic: "test", partition: 1, @@ -163,7 +166,8 @@ func TestBatch(t *testing.T) { }, { row: &model.RowChangedEvent{ - CommitTs: 2, + // Indicates that this event is not expected to be processed + CommitTs: math.MaxUint64, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, }, @@ -175,7 +179,6 @@ func TestBatch(t *testing.T) { } var wg sync.WaitGroup - ctx := context.Background() batch := make([]mqEvent, 3) for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -191,8 +194,12 @@ func TestBatch(t *testing.T) { go func() { for _, event := range test.events { - err := worker.addEvent(context.Background(), event) - require.NoError(t, err) + err := worker.addEvent(ctx, event) + if event.row == nil || event.row.CommitTs != math.MaxUint64 { + require.NoError(t, err) + } else { + require.Regexp(t, ".*context canceled.*", err) + } } }() wg.Wait() @@ -215,7 +222,9 @@ func TestGroup(t *testing.T) { topic: "test1", partition: 2, } - worker, _ := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, _ := newTestWorker(ctx) events := []mqEvent{ { @@ -291,7 +300,9 @@ func TestAsyncSend(t *testing.T) { partition: 3, } - worker, producer := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, producer := newTestWorker(ctx) events := []mqEvent{ { row: &model.RowChangedEvent{ @@ -359,7 +370,10 @@ func TestFlush(t *testing.T) { topic: "test", partition: 1, } - worker, producer := newTestWorker() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, producer := newTestWorker(ctx) events := []mqEvent{ { @@ -420,8 +434,8 @@ func TestFlush(t *testing.T) { func TestAbort(t *testing.T) { t.Parallel() - worker, _ := newTestWorker() ctx, cancel := context.WithCancel(context.Background()) + worker, _ := newTestWorker(ctx) var wg sync.WaitGroup wg.Add(1) @@ -438,9 +452,9 @@ func TestAbort(t *testing.T) { func TestProducerError(t *testing.T) { t.Parallel() - worker, prod := newTestWorker() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + worker, prod := newTestWorker(ctx) var wg sync.WaitGroup wg.Add(1)