diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 0b39db09c60..5684767beb7 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -393,7 +393,13 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { return r == '/' }) - producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh) + if topic == "" { + return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + var protocol codec.Protocol + protocol.FromString(replicaConfig.Sink.Protocol) + producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh) if err != nil { return nil, errors.Trace(err) } @@ -423,8 +429,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { opts["max-batch-size"] = s } - // For now, it's a place holder. Avro format have to make connection to Schema Registery, - // and it may needs credential. + // For now, it's a placeholder. Avro format have to make connection to Schema Registry, + // and it may need credential. credential := &security.Credential{} sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh) if err != nil { diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index ee5b0146f60..029700c26c4 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { c.Assert(err, check.IsNil) opts := map[string]string{} errCh := make(chan error, 1) + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") + }() + sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) @@ -161,6 +167,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { c.Assert(err, check.IsNil) opts := map[string]string{} errCh := make(chan error, 1) + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") + }() + sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 8f3a6a6585d..5cfd5b69ef8 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -37,11 +37,14 @@ import ( "go.uber.org/zap" ) -const defaultPartitionNum = 4 +const defaultPartitionNum = 3 -// Config stores the Kafka configuration +// Config stores user specified Kafka producer configuration type Config struct { - PartitionNum int32 + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. ReplicationFactor int16 Version string @@ -50,8 +53,8 @@ type Config struct { ClientID string Credential *security.Credential SaslScram *security.SaslScram - // control whether to create topic and verify partition number - TopicPreProcess bool + // control whether to create topic + AutoCreate bool } // NewConfig returns a default Kafka configuration @@ -64,12 +67,13 @@ func NewConfig() *Config { Compression: "none", Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, - TopicPreProcess: true, + AutoCreate: true, } } // Initialize the kafka configuration func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") params := sinkURI.Query() s := params.Get("partition-num") if s != "" { @@ -155,7 +159,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi if err != nil { return err } - c.TopicPreProcess = autoCreate + c.AutoCreate = autoCreate } s = params.Get("protocol") @@ -395,80 +399,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } } -// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't -// exit, creates it automatically. -func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) { - admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg) +func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error { + // FIXME: find a way to remove this failpoint for workload the unit test + failpoint.Inject("SkipTopicAutoCreate", func() { + failpoint.Return(nil) + }) + admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig) if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } defer func() { - err := admin.Close() - if err != nil { - log.Warn("close admin client failed", zap.Error(err)) + if err := admin.Close(); err != nil { + log.Warn("close kafka cluster admin failed", zap.Error(err)) } }() + topics, err := admin.ListTopics() if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - partitionNum := config.PartitionNum - topicDetail, exist := topics[topic] - if exist { - log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions)) - if partitionNum == 0 { - partitionNum = topicDetail.NumPartitions - } else if partitionNum < topicDetail.NumPartitions { - log.Warn("partition number assigned in sink-uri is less than that of topic", zap.Int32("topic partition num", topicDetail.NumPartitions)) - } else if partitionNum > topicDetail.NumPartitions { - return 0, cerror.ErrKafkaInvalidPartitionNum.GenWithStack( - "partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions) + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + info, created := topics[topic] + // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. + if created { + // make sure that topic's `max.message.bytes` is not less than given `max-message-bytes` + // else the producer will send message that too large to make topic reject, then changefeed would error. + // only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them. + if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft { + topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + if topicMaxMessageBytes < config.MaxMessageBytes { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+ + "Please make sure `max-message-bytes` not greater than topic `max.message.bytes`", + topicMaxMessageBytes, config.MaxMessageBytes) + } } - } else { - if partitionNum == 0 { - partitionNum = defaultPartitionNum - log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum)) + + // no need to create the topic, but we would have to log user if they found enter wrong topic name later + if config.AutoCreate { + log.Warn("topic already exist, TiCDC will not create the topic", + zap.String("topic", topic), zap.Any("detail", info)) } - log.Info("create a topic", zap.String("topic", topic), - zap.Int32("partition_num", partitionNum), - zap.Int16("replication_factor", config.ReplicationFactor)) - err := admin.CreateTopic(topic, &sarama.TopicDetail{ - NumPartitions: partitionNum, - ReplicationFactor: config.ReplicationFactor, - }, false) - // TODO idenfity the cause of "Topic with this name already exists" - if err != nil && !strings.Contains(err.Error(), "already exists") { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + + if err := config.adjustPartitionNum(info.NumPartitions); err != nil { + return errors.Trace(err) } + + return nil + } + + if !config.AutoCreate { + return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found") + } + + // when try to create the topic, we don't know how to set the `max.message.bytes` for the topic. + // Kafka would create the topic with broker's `message.max.bytes`, + // we have to make sure it's not greater than `max-message-bytes` + brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) + if err != nil { + log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") + return errors.Trace(err) + } + + if brokerMessageMaxBytes < config.MaxMessageBytes { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "broker's message.max.bytes(%d) less than max-message-bytes(%d)"+ + "Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`", + brokerMessageMaxBytes, config.MaxMessageBytes) + } + + // topic not created yet, and user does not specify the `partition-num` in the sink uri. + if config.PartitionNum == 0 { + config.PartitionNum = defaultPartitionNum + log.Warn("partition-num is not set, use the default partition count", + zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum)) } - return partitionNum, nil + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }, false) + // TODO identify the cause of "Topic with this name already exists" + if err != nil && !strings.Contains(err.Error(), "already exists") { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + log.Info("TiCDC create the topic", + zap.Int32("partition-num", config.PartitionNum), + zap.Int16("replication-factor", config.ReplicationFactor)) + + return nil } var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { return nil, err } - asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg) - if err != nil { + + if err := topicPreProcess(topic, protocol, config, cfg); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - syncClient, err := sarama.NewSyncProducer(strings.Split(address, ","), cfg) + + asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - - partitionNum := config.PartitionNum - if config.TopicPreProcess { - partitionNum, err = kafkaTopicPreProcess(topic, address, config, cfg) - if err != nil { - return nil, err - } + syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } notifier := new(notify.Notifier) @@ -480,11 +526,11 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c asyncClient: asyncClient, syncClient: syncClient, topic: topic, - partitionNum: partitionNum, + partitionNum: config.PartitionNum, partitionOffset: make([]struct { flushed uint64 sent uint64 - }, partitionNum), + }, config.PartitionNum), flushedNotifier: notifier, flushedReceiver: flushedReceiver, closeCh: make(chan struct{}), @@ -550,7 +596,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { } config.Version = version // See: https://kafka.apache.org/documentation/#replication - // When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process. + // When one of the brokers in a Kafka cluster is down, the partition leaders + // in this broker is broken, Kafka will election a new partition leader and + // replication logs, this process will last from a few seconds to a few minutes. + // Kafka cluster will not provide a writing service in this process. // Time out in one minute(120 * 500ms). config.Metadata.Retry.Max = 120 config.Metadata.Retry.Backoff = 500 * time.Millisecond @@ -561,6 +610,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Producer.Return.Errors = true config.Producer.RequiredAcks = sarama.WaitForAll + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 + config.Producer.Retry.Backoff = 500 * time.Millisecond + switch strings.ToLower(strings.TrimSpace(c.Compression)) { case "none": config.Producer.Compression = sarama.CompressionNone @@ -577,10 +630,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Producer.Compression = sarama.CompressionNone } - // Time out in five minutes(600 * 500ms). - config.Producer.Retry.Max = 600 - config.Producer.Retry.Backoff = 500 * time.Millisecond - // Time out in one minute(120 * 500ms). config.Admin.Retry.Max = 120 config.Admin.Retry.Backoff = 500 * time.Millisecond @@ -610,3 +659,72 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { return config, err } + +func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { + target := "message.max.bytes" + _, controllerID, err := admin.DescribeCluster() + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ + Type: sarama.BrokerResource, + Name: strconv.Itoa(int(controllerID)), + ConfigNames: []string{target}, + }) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + if len(configEntries) == 0 || configEntries[0].Name != target { + return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack( + "since cannot find the `message.max.bytes` from the broker's configuration, " + + "ticdc decline to create the topic and changefeed to prevent potential error") + } + + result, err := strconv.Atoi(configEntries[0].Value) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + return result, nil +} + +func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) { + if a, ok := info.ConfigEntries["max.message.bytes"]; ok { + result, err := strconv.Atoi(*a) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + return result, nil + } + + return getBrokerMessageMaxBytes(admin) +} + +// adjust the partition-num by the topic's partition count +func (c *Config) adjustPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index f44fa8ae8ee..12403bda857 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/cdc/sink/codec" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" @@ -96,6 +98,12 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) { for k, v := range opts { c.Assert(v, check.Equals, expectedOpts[k]) } + + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") } func (s *kafkaSuite) TestSaramaProducer(c *check.C) { @@ -116,7 +124,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) prodSuccess.AddTopicPartition(topic, 1, sarama.ErrNoError) // 200 async messages and 2 sync message, Kafka flush could be in batch, - // we can set flush.maxmessages to 1 to control message count exactly. + // we can set flush.max.messages to 1 to control message count exactly. for i := 0; i < 202; i++ { leader.Returns(prodSuccess) } @@ -128,7 +136,8 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -137,11 +146,13 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { cfg.Producer.Flush.MaxMessages = 1 return cfg, err } + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) defer func() { newSaramaConfigImpl = newSaramaConfigImplBak + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) for i := 0; i < 100; i++ { @@ -223,6 +234,23 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } } +func (s *kafkaSuite) TestAdjustPartitionNum(c *check.C) { + defer testleak.AfterTest(c)() + config := NewConfig() + err := config.adjustPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(config.PartitionNum, check.Equals, int32(2)) + + config.PartitionNum = 1 + err = config.adjustPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(config.PartitionNum, check.Equals, int32(1)) + + config.PartitionNum = 3 + err = config.adjustPartitionNum(2) + c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) +} + func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { defer testleak.AfterTest(c) topic := "unit_test_2" @@ -240,47 +268,19 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { "MetadataRequest": metaResponse, "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), }) - config := NewConfig() config.PartitionNum = int32(0) + config.BrokerEndpoints = strings.Split(broker.Addr(), ",") + config.AutoCreate = false + cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(2)) + config.BrokerEndpoints = []string{""} cfg.Metadata.Retry.Max = 1 - _, err = kafkaTopicPreProcess(topic, "", config, cfg) - c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) - - config.PartitionNum = int32(4) - _, err = kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) -} -func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { - defer testleak.AfterTest(c)() - topic := "unit_test_3" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - broker := sarama.NewMockBroker(c, 1) - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(c). - SetBroker(broker.Addr(), broker.BrokerID()). - SetController(broker.BrokerID()), - "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), - "CreateTopicsRequest": sarama.NewMockCreateTopicsResponse(c), - }) - defer broker.Close() - - config := NewConfig() - config.PartitionNum = int32(0) - cfg, err := newSaramaConfigImpl(ctx, config) - c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(4)) + err = topicPreProcess(topic, codec.ProtocolDefault, config, cfg) + c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) } func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { @@ -345,8 +345,12 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { errCh := make(chan error, 1) config := NewConfig() config.Version = "invalid" - _, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) + config.BrokerEndpoints = []string{"127.0.0.1:1111"} + topic := "topic" + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + _, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") } func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { @@ -370,7 +374,10 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -386,8 +393,9 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") err := producer.Close() c.Assert(err, check.IsNil) }() @@ -444,13 +452,17 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) defer func() { err := producer.Close() c.Assert(err, check.IsNil) + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() c.Assert(err, check.IsNil)