From 5c54e1a75165a9c68ac3bafeed587d9cda3e9e79 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 18 May 2022 12:23:43 +0800 Subject: [PATCH 1/2] fix mq test --- cdc/sink/mq/dispatcher/main_test.go | 24 ++ cdc/sink/mq/dispatcher/partition/main_test.go | 24 ++ cdc/sink/mq/dispatcher/topic/main_test.go | 24 ++ cdc/sink/mq/main_test.go | 24 ++ .../{kafka/manager.go => kafka_manager.go} | 22 +- .../manager_test.go => kafka_manager_test.go} | 10 +- cdc/sink/mq/manager/main_test.go | 24 ++ .../{pulsar/manager.go => pulsar_manager.go} | 16 +- cdc/sink/mq/mq.go | 6 +- cdc/sink/mq/mq_flush_worker_test.go | 38 +++- cdc/sink/mq/mq_test.go | 211 +++++++++--------- cdc/sink/mq/producer/kafka/kafka_test.go | 5 - cdc/sink/mq/producer/kafka/main_test.go | 24 ++ pkg/kafka/cluster_admin_client_mock_impl.go | 2 + pkg/leakutil/leak_helper.go | 2 + 15 files changed, 304 insertions(+), 152 deletions(-) create mode 100644 cdc/sink/mq/dispatcher/main_test.go create mode 100644 cdc/sink/mq/dispatcher/partition/main_test.go create mode 100644 cdc/sink/mq/dispatcher/topic/main_test.go create mode 100644 cdc/sink/mq/main_test.go rename cdc/sink/mq/manager/{kafka/manager.go => kafka_manager.go} (89%) rename cdc/sink/mq/manager/{kafka/manager_test.go => kafka_manager_test.go} (92%) create mode 100644 cdc/sink/mq/manager/main_test.go rename cdc/sink/mq/manager/{pulsar/manager.go => pulsar_manager.go} (71%) create mode 100644 cdc/sink/mq/producer/kafka/main_test.go diff --git a/cdc/sink/mq/dispatcher/main_test.go b/cdc/sink/mq/dispatcher/main_test.go new file mode 100644 index 00000000000..c7773253ab1 --- /dev/null +++ b/cdc/sink/mq/dispatcher/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/dispatcher/partition/main_test.go b/cdc/sink/mq/dispatcher/partition/main_test.go new file mode 100644 index 00000000000..324f8e6d846 --- /dev/null +++ b/cdc/sink/mq/dispatcher/partition/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package partition + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/dispatcher/topic/main_test.go b/cdc/sink/mq/dispatcher/topic/main_test.go new file mode 100644 index 00000000000..6c950cda7f3 --- /dev/null +++ b/cdc/sink/mq/dispatcher/topic/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package topic + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/main_test.go b/cdc/sink/mq/main_test.go new file mode 100644 index 00000000000..ecdd0528eeb --- /dev/null +++ b/cdc/sink/mq/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/manager/kafka/manager.go b/cdc/sink/mq/manager/kafka_manager.go similarity index 89% rename from cdc/sink/mq/manager/kafka/manager.go rename to cdc/sink/mq/manager/kafka_manager.go index 566f1052d05..1d34164dc9b 100644 --- a/cdc/sink/mq/manager/kafka/manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kafka +package manager import ( "fmt" @@ -28,8 +28,8 @@ import ( "go.uber.org/zap" ) -// TopicManager is a manager for kafka topics. -type TopicManager struct { +// kafkaTopicManager is a manager for kafka topics. +type kafkaTopicManager struct { client kafka.Client admin kafka.ClusterAdminClient @@ -40,13 +40,13 @@ type TopicManager struct { lastMetadataRefresh atomic.Int64 } -// NewTopicManager creates a new topic manager. -func NewTopicManager( +// NewKafkaTopicManager creates a new topic manager. +func NewKafkaTopicManager( client kafka.Client, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, -) *TopicManager { - return &TopicManager{ +) *kafkaTopicManager { + return &kafkaTopicManager{ client: client, admin: admin, cfg: cfg, @@ -55,7 +55,7 @@ func NewTopicManager( // GetPartitionNum returns the number of partitions of the topic. // It may also try to update the topics' information maintained by manager. -func (m *TopicManager) GetPartitionNum(topic string) (int32, error) { +func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) { err := m.tryRefreshMeta() if err != nil { return 0, errors.Trace(err) @@ -69,7 +69,7 @@ func (m *TopicManager) GetPartitionNum(topic string) (int32, error) { } // tryRefreshMeta try to refresh the topics' information maintained by manager. -func (m *TopicManager) tryRefreshMeta() error { +func (m *kafkaTopicManager) tryRefreshMeta() error { if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute { topics, err := m.client.Topics() if err != nil { @@ -90,7 +90,7 @@ func (m *TopicManager) tryRefreshMeta() error { } // tryUpdatePartitionsAndLogging try to update the partitions of the topic. -func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) { +func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) { oldPartitions, ok := m.topics.Load(topic) if ok { if oldPartitions.(int32) != partitions { @@ -114,7 +114,7 @@ func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions in // CreateTopic creates a topic with the given name // and returns the number of partitions. -func (m *TopicManager) CreateTopic(topicName string) (int32, error) { +func (m *kafkaTopicManager) CreateTopic(topicName string) (int32, error) { start := time.Now() topics, err := m.admin.ListTopics() if err != nil { diff --git a/cdc/sink/mq/manager/kafka/manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go similarity index 92% rename from cdc/sink/mq/manager/kafka/manager_test.go rename to cdc/sink/mq/manager/kafka_manager_test.go index e89def1d076..6979550dbd6 100644 --- a/cdc/sink/mq/manager/kafka/manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kafka +package manager import ( "testing" @@ -36,7 +36,7 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } - manager := NewTopicManager(client, adminClient, cfg) + manager := NewKafkaTopicManager(client, adminClient, cfg) partitionsNum, err := manager.GetPartitionNum( kafkamock.DefaultMockTopicName) require.Nil(t, err) @@ -57,7 +57,7 @@ func TestTryRefreshMeta(t *testing.T) { ReplicationFactor: 1, } - manager := NewTopicManager(client, adminClient, cfg) + manager := NewKafkaTopicManager(client, adminClient, cfg) partitionsNum, err := manager.GetPartitionNum( kafkamock.DefaultMockTopicName) require.Nil(t, err) @@ -92,7 +92,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - manager := NewTopicManager(client, adminClient, cfg) + manager := NewKafkaTopicManager(client, adminClient, cfg) partitionNum, err := manager.CreateTopic(kafkamock.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionNum) @@ -106,7 +106,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewTopicManager(client, adminClient, cfg) + manager = NewKafkaTopicManager(client, adminClient, cfg) _, err = manager.CreateTopic("new-topic2") require.Regexp( t, diff --git a/cdc/sink/mq/manager/main_test.go b/cdc/sink/mq/manager/main_test.go new file mode 100644 index 00000000000..93967811109 --- /dev/null +++ b/cdc/sink/mq/manager/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sink/mq/manager/pulsar/manager.go b/cdc/sink/mq/manager/pulsar_manager.go similarity index 71% rename from cdc/sink/mq/manager/pulsar/manager.go rename to cdc/sink/mq/manager/pulsar_manager.go index b212c814cf5..e7b343520f1 100644 --- a/cdc/sink/mq/manager/pulsar/manager.go +++ b/cdc/sink/mq/manager/pulsar_manager.go @@ -11,30 +11,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pulsar +package manager -// TopicManager is the interface +// pulsarTopicManager is the interface // that wraps the basic Pulsar topic management operations. // Right now it doesn't have any implementation, // Pulsar doesn't support multiple topics yet. // So it now just returns a fixed number of partitions for a fixed topic. -type TopicManager struct { +type pulsarTopicManager struct { partitionNum int32 } -// NewTopicManager creates a new TopicManager. -func NewTopicManager(partitionNum int32) *TopicManager { - return &TopicManager{ +// NewPulsarTopicManager creates a new TopicManager. +func NewPulsarTopicManager(partitionNum int32) *pulsarTopicManager { + return &pulsarTopicManager{ partitionNum: partitionNum, } } // GetPartitionNum returns the number of partitions of the topic. -func (m *TopicManager) GetPartitionNum(_ string) (int32, error) { +func (m *pulsarTopicManager) GetPartitionNum(_ string) (int32, error) { return m.partitionNum, nil } // CreateTopic do nothing. -func (m *TopicManager) CreateTopic(_ string) (int32, error) { +func (m *pulsarTopicManager) CreateTopic(_ string) (int32, error) { return m.partitionNum, nil } diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 5487631a821..831e9db98be 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -28,8 +28,6 @@ import ( "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/mq/manager" - kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka" - pulsarmanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/pulsar" "github.com/pingcap/tiflow/cdc/sink/mq/producer" "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar" @@ -431,7 +429,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - topicManager := kafkamanager.NewTopicManager( + topicManager := manager.NewKafkaTopicManager( client, adminClient, baseConfig.DeriveTopicConfig(), @@ -500,7 +498,7 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if err != nil { return nil, errors.Trace(err) } - fakeTopicManager := pulsarmanager.NewTopicManager( + fakeTopicManager := manager.NewPulsarTopicManager( producer.GetPartitionNum(), ) sink, err := newMqSink( diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index c8ba0adc460..0c1b8421bbc 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -15,6 +15,7 @@ package mq import ( "context" + "math" "sync" "testing" @@ -79,7 +80,7 @@ func NewMockProducer() *mockProducer { } } -func newTestWorker() (*flushWorker, *mockProducer) { +func newTestWorker(ctx context.Context) (*flushWorker, *mockProducer) { // 200 is about the size of a row change. encoderConfig := codec.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(200) builder, err := codec.NewEventBatchEncoderBuilder(context.Background(), encoderConfig) @@ -92,14 +93,16 @@ func newTestWorker() (*flushWorker, *mockProducer) { } producer := NewMockProducer() return newFlushWorker(encoder, producer, - metrics.NewStatistics(context.Background(), metrics.SinkTypeMQ)), producer + metrics.NewStatistics(ctx, metrics.SinkTypeMQ)), producer } //nolint:tparallel func TestBatch(t *testing.T) { t.Parallel() - worker, _ := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, _ := newTestWorker(ctx) key := topicPartitionKey{ topic: "test", partition: 1, @@ -160,7 +163,8 @@ func TestBatch(t *testing.T) { }, { row: &model.RowChangedEvent{ - CommitTs: 2, + // Indicates that this event is not expected to be processed + CommitTs: math.MaxUint64, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, }, @@ -172,7 +176,6 @@ func TestBatch(t *testing.T) { } var wg sync.WaitGroup - ctx := context.Background() batch := make([]mqEvent, 3) for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -188,8 +191,12 @@ func TestBatch(t *testing.T) { go func() { for _, event := range test.events { - err := worker.addEvent(context.Background(), event) - require.NoError(t, err) + err := worker.addEvent(ctx, event) + if event.row == nil || event.row.CommitTs != math.MaxUint64 { + require.NoError(t, err) + } else { + require.Regexp(t, ".*context canceled.*", err) + } } }() wg.Wait() @@ -212,7 +219,9 @@ func TestGroup(t *testing.T) { topic: "test1", partition: 2, } - worker, _ := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, _ := newTestWorker(ctx) events := []mqEvent{ { @@ -288,7 +297,9 @@ func TestAsyncSend(t *testing.T) { partition: 3, } - worker, producer := newTestWorker() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, producer := newTestWorker(ctx) events := []mqEvent{ { row: &model.RowChangedEvent{ @@ -356,7 +367,10 @@ func TestFlush(t *testing.T) { topic: "test", partition: 1, } - worker, producer := newTestWorker() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker, producer := newTestWorker(ctx) events := []mqEvent{ { @@ -417,8 +431,8 @@ func TestFlush(t *testing.T) { func TestAbort(t *testing.T) { t.Parallel() - worker, _ := newTestWorker() ctx, cancel := context.WithCancel(context.Background()) + worker, _ := newTestWorker(ctx) var wg sync.WaitGroup wg.Add(1) @@ -435,9 +449,9 @@ func TestAbort(t *testing.T) { func TestProducerError(t *testing.T) { t.Parallel() - worker, prod := newTestWorker() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + worker, prod := newTestWorker(ctx) var wg sync.WaitGroup wg.Add(1) diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index e01e6c53daf..473b001024b 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -17,9 +17,9 @@ import ( "context" "fmt" "net/url" + "testing" "github.com/Shopify/sarama" - "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/model" @@ -29,38 +29,60 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/kafka" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/pingcap/tiflow/pkg/retry" + "github.com/stretchr/testify/require" ) -type mqSinkSuite struct{} +func initBroker(t *testing.T, partitionNum int) (*sarama.MockBroker, string) { + topic := kafka.DefaultMockTopicName + leader := sarama.NewMockBroker(t, 1) + + metadataResponse := sarama.NewMockMetadataResponse(t) + metadataResponse.SetBroker(leader.Addr(), leader.BrokerID()) + for i := 0; i < partitionNum; i++ { + metadataResponse.SetLeader(topic, int32(i), leader.BrokerID()) + } + + prodSuccess := sarama.NewMockProduceResponse(t) -var _ = check.Suite(&mqSinkSuite{}) + handlerMap := make(map[string]sarama.MockResponse) + handlerMap["MetadataRequest"] = metadataResponse + handlerMap["ProduceRequest"] = prodSuccess + leader.SetHandlerByMap(handlerMap) + return leader, topic +} + +func waitCheckpointTs(t *testing.T, s *mqSink, tableID int64, target uint64) uint64 { + var checkpointTs uint64 + err := retry.Do(context.Background(), func() error { + if v, ok := s.tableCheckpointTsMap.Load(tableID); ok { + checkpointTs = v.(uint64) + } + if checkpointTs >= target { + return nil + } + return errors.Errorf("current checkponitTs %d is not larger than %d", checkpointTs, target) + }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) + + require.Nil(t, err) + return checkpointTs +} -func (s mqSinkSuite) TestKafkaSink(c *check.C) { - defer testleak.AfterTest(c)() +func TestKafkaSink(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - topic := kafka.DefaultMockTopicName - leader := sarama.NewMockBroker(c, 1) + leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) - leader.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=1" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) + require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() fr, err := filter.NewFilter(replicaConfig) - c.Assert(err, check.IsNil) + require.Nil(t, err) opts := map[string]string{} errCh := make(chan error, 1) @@ -70,16 +92,15 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { }() sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) - c.Assert(err, check.IsNil) + require.Nil(t, err) encoder := sink.encoderBuilder.Build() - c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 1048576) + require.IsType(t, &codec.JSONEventBatchEncoder{}, encoder) + require.Equal(t, 1, encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize()) + require.Equal(t, 1048576, encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes()) // mock kafka broker processes 1 row changed event - leader.Returns(prodSuccess) tableID := model.TableID(1) row := &model.RowChangedEvent{ Table: &model.TableName{ @@ -92,25 +113,24 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } err = sink.EmitRowChangedEvents(ctx, row) - c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(120))) - c.Assert(err, check.IsNil) - c.Assert(checkpointTs, check.Equals, uint64(120)) + require.Nil(t, err) + _, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(120))) + require.Nil(t, err) + checkpointTs := waitCheckpointTs(t, sink, tableID, uint64(120)) + require.Equal(t, uint64(120), checkpointTs) // flush older resolved ts checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) - c.Assert(err, check.IsNil) - c.Assert(checkpointTs, check.Equals, uint64(120)) + require.Nil(t, err) + require.Equal(t, uint64(120), checkpointTs) // mock kafka broker processes 1 checkpoint ts event - leader.Returns(prodSuccess) err = sink.EmitCheckpointTs(ctx, uint64(120), []model.TableName{{ Schema: "test", Table: "t1", }}) - c.Assert(err, check.IsNil) + require.Nil(t, err) // mock kafka broker processes 1 ddl event - leader.Returns(prodSuccess) ddl := &model.DDLEvent{ StartTs: 130, CommitTs: 140, @@ -121,55 +141,45 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { Type: 1, } err = sink.EmitDDLEvent(ctx, ddl) - c.Assert(err, check.IsNil) + require.Nil(t, err) cancel() err = sink.EmitRowChangedEvents(ctx, row) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } err = sink.EmitDDLEvent(ctx, ddl) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } err = sink.EmitCheckpointTs(ctx, uint64(140), nil) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } err = sink.Close(ctx) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } } -func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { - defer testleak.AfterTest(c)() +func TestKafkaSinkFilter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - topic := kafka.DefaultMockTopicName - leader := sarama.NewMockBroker(c, 1) + leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) - leader.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&auto-create-topic=false&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) + require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Filter = &config.FilterConfig{ Rules: []string{"test.*"}, } fr, err := filter.NewFilter(replicaConfig) - c.Assert(err, check.IsNil) + require.Nil(t, err) opts := map[string]string{} errCh := make(chan error, 1) @@ -179,7 +189,7 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { }() sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) - c.Assert(err, check.IsNil) + require.Nil(t, err) row := &model.RowChangedEvent{ Table: &model.TableName{ @@ -190,8 +200,8 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { CommitTs: 120, } err = sink.EmitRowChangedEvents(ctx, row) - c.Assert(err, check.IsNil) - c.Assert(sink.statistics.TotalRowsCount(), check.Equals, uint64(0)) + require.Nil(t, err) + require.Equal(t, uint64(0), sink.statistics.TotalRowsCount()) ddl := &model.DDLEvent{ StartTs: 130, @@ -203,69 +213,58 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { Type: 1, } err = sink.EmitDDLEvent(ctx, ddl) - c.Assert(cerror.ErrDDLEventIgnored.Equal(err), check.IsTrue) + require.True(t, cerror.ErrDDLEventIgnored.Equal(err)) err = sink.Close(ctx) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } } -func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) { - defer testleak.AfterTest(c)() +func TestPulsarSinkEncoderConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/pulsar/MockPulsar", "return(true)") - c.Assert(err, check.IsNil) + err := failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar/MockPulsar", + "return(true)") + require.Nil(t, err) uri := "pulsar://127.0.0.1:1234/kafka-test?" + - "max-message-bytes=4194304&max-batch-size=1" + "max-message-bytes=4194304&max-batch-size=1&protocol=open-protocol" sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) + require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() fr, err := filter.NewFilter(replicaConfig) - c.Assert(err, check.IsNil) + require.Nil(t, err) opts := map[string]string{} errCh := make(chan error, 1) sink, err := NewPulsarSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) - c.Assert(err, check.IsNil) + require.Nil(t, err) encoder := sink.encoderBuilder.Build() - c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304) + require.IsType(t, &codec.JSONEventBatchEncoder{}, encoder) + require.Equal(t, 1, encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize()) + require.Equal(t, 4194304, encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes()) } -func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { - defer testleak.AfterTest(c)() +func TestFlushRowChangedEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - topic := kafka.DefaultMockTopicName - leader := sarama.NewMockBroker(c, 1) + leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) - leader.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) - uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=1" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) + require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() fr, err := filter.NewFilter(replicaConfig) - c.Assert(err, check.IsNil) + require.Nil(t, err) opts := map[string]string{} errCh := make(chan error, 1) @@ -275,10 +274,9 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { }() sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) - c.Assert(err, check.IsNil) + require.Nil(t, err) // mock kafka broker processes 1 row changed event - leader.Returns(prodSuccess) tableID1 := model.TableID(1) row1 := &model.RowChangedEvent{ Table: &model.TableName{ @@ -291,7 +289,7 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } err = sink.EmitRowChangedEvents(ctx, row1) - c.Assert(err, check.IsNil) + require.Nil(t, err) tableID2 := model.TableID(2) row2 := &model.RowChangedEvent{ @@ -305,7 +303,7 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } err = sink.EmitRowChangedEvents(ctx, row2) - c.Assert(err, check.IsNil) + require.Nil(t, err) tableID3 := model.TableID(3) row3 := &model.RowChangedEvent{ @@ -320,33 +318,32 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row3) - c.Assert(err, check.IsNil) + require.Nil(t, err) // mock kafka broker processes 1 row resolvedTs event - leader.Returns(prodSuccess) - checkpointTs1, err := sink.FlushRowChangedEvents(ctx, - tableID1, model.NewResolvedTs(row1.CommitTs)) - c.Assert(err, check.IsNil) - c.Assert(checkpointTs1, check.Equals, row1.CommitTs) - - checkpointTs2, err := sink.FlushRowChangedEvents(ctx, - tableID2, model.NewResolvedTs(row2.CommitTs)) - c.Assert(err, check.IsNil) - c.Assert(checkpointTs2, check.Equals, row2.CommitTs) - - checkpointTs3, err := sink.FlushRowChangedEvents(ctx, - tableID3, model.NewResolvedTs(row3.CommitTs)) - c.Assert(err, check.IsNil) - c.Assert(checkpointTs3, check.Equals, row3.CommitTs) + _, err = sink.FlushRowChangedEvents(ctx, tableID1, model.NewResolvedTs(row1.CommitTs)) + require.Nil(t, err) + checkpointTs1 := waitCheckpointTs(t, sink, tableID1, row1.CommitTs) + require.Equal(t, row1.CommitTs, checkpointTs1) + + _, err = sink.FlushRowChangedEvents(ctx, tableID2, model.NewResolvedTs(row2.CommitTs)) + require.Nil(t, err) + checkpointTs2 := waitCheckpointTs(t, sink, tableID2, row2.CommitTs) + require.Equal(t, row2.CommitTs, checkpointTs2) + + _, err = sink.FlushRowChangedEvents(ctx, tableID3, model.NewResolvedTs(row3.CommitTs)) + require.Nil(t, err) + checkpointTs3 := waitCheckpointTs(t, sink, tableID3, row3.CommitTs) + require.Equal(t, row3.CommitTs, checkpointTs3) // flush older resolved ts - checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, - model.NewResolvedTs(uint64(110))) - c.Assert(err, check.IsNil) - c.Assert(checkpointTsOld, check.Equals, row1.CommitTs) + _, err = sink.FlushRowChangedEvents(ctx, tableID1, model.NewResolvedTs(uint64(110))) + require.Nil(t, err) + checkpointTsOld := waitCheckpointTs(t, sink, tableID1, row1.CommitTs) + require.Equal(t, row1.CommitTs, checkpointTsOld) err = sink.Close(ctx) if err != nil { - c.Assert(errors.Cause(err), check.Equals, context.Canceled) + require.Equal(t, context.Canceled, errors.Cause(err)) } } diff --git a/cdc/sink/mq/producer/kafka/kafka_test.go b/cdc/sink/mq/producer/kafka/kafka_test.go index 1b35ceb3dcc..c359c785ad2 100644 --- a/cdc/sink/mq/producer/kafka/kafka_test.go +++ b/cdc/sink/mq/producer/kafka/kafka_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" @@ -31,10 +30,6 @@ import ( "github.com/stretchr/testify/require" ) -type kafkaSuite struct{} - -var _ = check.Suite(&kafkaSuite{}) - func TestClientID(t *testing.T) { testCases := []struct { role string diff --git a/cdc/sink/mq/producer/kafka/main_test.go b/cdc/sink/mq/producer/kafka/main_test.go new file mode 100644 index 00000000000..82ac18eca4d --- /dev/null +++ b/cdc/sink/mq/producer/kafka/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/kafka/cluster_admin_client_mock_impl.go b/pkg/kafka/cluster_admin_client_mock_impl.go index d5df9a224b5..b49ae31c7f9 100644 --- a/pkg/kafka/cluster_admin_client_mock_impl.go +++ b/pkg/kafka/cluster_admin_client_mock_impl.go @@ -22,6 +22,8 @@ import ( const ( // DefaultMockTopicName specifies the default mock topic name. DefaultMockTopicName = "mock_topic" + // DefaultMockPartitionNum is the default partition number of default mock topic. + DefaultMockPartitionNum = 3 // defaultMockControllerID specifies the default mock controller ID. defaultMockControllerID = 1 ) diff --git a/pkg/leakutil/leak_helper.go b/pkg/leakutil/leak_helper.go index dcba28ebbb7..ff2f1cc088f 100644 --- a/pkg/leakutil/leak_helper.go +++ b/pkg/leakutil/leak_helper.go @@ -24,6 +24,8 @@ var defaultOpts = []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + // library used by sarama, ref: https://github.com/rcrowley/go-metrics/pull/266 + goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"), } // VerifyNone verifies that no unexpected leaks occur From 0078166d452cad0feaa4818a645418d903f0f304 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 18 May 2022 14:33:32 +0800 Subject: [PATCH 2/2] add some comments --- cdc/sink/mq/mq_flush_worker_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index 0c1b8421bbc..6ccbab3ab50 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -180,7 +180,6 @@ func TestBatch(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Can not be parallel, it tests reusing the same batch. - wg.Add(1) go func() { defer wg.Done() @@ -192,10 +191,11 @@ func TestBatch(t *testing.T) { go func() { for _, event := range test.events { err := worker.addEvent(ctx, event) - if event.row == nil || event.row.CommitTs != math.MaxUint64 { - require.NoError(t, err) - } else { + if event.row != nil && event.row.CommitTs == math.MaxUint64 { + // For unprocessed events, addEvent returns after ctx has been cancelled. require.Regexp(t, ".*context canceled.*", err) + } else { + require.NoError(t, err) } } }()