Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): set max-message-bytes default to 10m #4036

Merged
merged 19 commits into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type CraftEventBatchEncoder struct {
messageBuf []*MQMessage

// configs
maxMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int

allocator *craft.SliceAllocator
}
Expand All @@ -56,7 +56,7 @@ func (e *CraftEventBatchEncoder) flush() {
// AppendRowChangedEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if size > e.maxMessageSize || rows >= e.maxBatchSize {
if size > e.maxMessageBytes || rows >= e.maxBatchSize {
e.flush()
}
return EncoderNoOperation, nil
Expand Down Expand Up @@ -102,15 +102,15 @@ func (e *CraftEventBatchEncoder) Reset() {
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error

e.maxMessageSize = DefaultMaxMessageBytes
e.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
e.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
}
if e.maxMessageSize <= 0 || e.maxMessageSize > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageSize))
if e.maxMessageBytes <= 0 || e.maxMessageBytes > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageBytes))
}

e.maxBatchSize = DefaultMaxBatchSize
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/codec/craft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -152,7 +153,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)
Expand All @@ -166,7 +167,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16))
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.NotNil)
Expand Down
26 changes: 12 additions & 14 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -383,13 +381,13 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
maxMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int
}

// GetMaxMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageSize() int {
return d.maxMessageSize
return d.maxMessageBytes
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -468,15 +466,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxMessageSize {
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageSize {
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -495,10 +493,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxMessageSize {
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageSize))
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -617,15 +615,15 @@ func (d *JSONEventBatchEncoder) Reset() {
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error

d.maxMessageSize = DefaultMaxMessageBytes
d.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
}
if d.maxMessageSize <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageSize))
if d.maxMessageBytes <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
}

d.maxBatchSize = DefaultMaxBatchSize
Expand Down
11 changes: 6 additions & 5 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -203,7 +204,7 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -214,12 +215,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -230,12 +231,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type Config struct {
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
// MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker.
MaxMessageBytes: 1 * 1024 * 1024,
// MaxMessageBytes will be used to initialize producer
MaxMessageBytes: config.DefaultMaxMessageBytes,
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
Expand Down
33 changes: 18 additions & 15 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, e
}
}()

if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config); err != nil {
if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
return
}

func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config) error {
func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error {
Copy link
Member

@Rustin170506 Rustin170506 Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func seems strange at the moment. Can we split it up?

  1. getMaxMessageBytes is responsible for getting the right value
  2. createTopic is used to create the topic

This way we won't have to pass two configurations and modify them at the same time. Now this function has become very complicated. Originally we didn't modify Sarama's configuration in this method. Now it is not only responsible for creating the topic, but also for setting the sarma configuration correctly, but this configuration is not directly related to creating the topic(It will affect when syncing, but it may be a pre-condition. So I think it can be separated). It does too many things. I prefer to keep them separate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a better way to get it not to set it up at the same time that would work too. I can only think of separating it at the moment.
The sarma configuration has been patched once above via kafka's configuration, but we're modifying it at the same time in this function.

It is currently confusing from either the caller's or the test code's point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worthy another PR to do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to do it in this time. It should just be a simple split method will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it would be better to do this after the release, to prevent some potential new problems.

topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand All @@ -354,17 +354,18 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic
info, exists := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if exists {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d). "+
"Please make sure `max-message-bytes` not greater than topic's `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
Expand All @@ -384,20 +385,22 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes`.
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d). "+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}

// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
Expand Down
71 changes: 43 additions & 28 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,33 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {

// When topic exists and max message bytes is set correctly.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes()
err := validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config)
cfg, err := newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg)
c.Assert(err, check.IsNil)

// When topic exists and max message bytes is not set correctly.
// It is larger than the value of topic.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() + 1024
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*Please make sure `max-message-bytes` not greater than topic's `max.message.bytes`.*",
)
// use the smaller one.
defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes()
config.MaxMessageBytes = defaultMaxMessageBytes + 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)

config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

// When topic does not exist and auto-create is not enabled.
config.AutoCreate = false
err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config)
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
Expand All @@ -222,46 +233,50 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {

// When the topic does not exist, use the broker's configuration to create the topic.
// It is less than the value of broker.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() - 1024
config.AutoCreate = true
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config)
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

// When the topic does not exist, use the broker's configuration to create the topic.
// It is larger than the value of broker.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() + 1024
config.MaxMessageBytes = defaultMaxMessageBytes + 1
config.AutoCreate = true
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`.*",
)
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)

// When the topic exists, but the topic does not store max message bytes info,
// the check of parameter succeeds.
// It is less than the value of broker.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() - 1024
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
detail := &sarama.TopicDetail{
NumPartitions: 3,
// Does not contain max message bytes information.
ConfigEntries: make(map[string]*string),
}
err = adminClient.CreateTopic("test-topic", detail, false)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg)
c.Assert(err, check.IsNil)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

// When the topic exists, but the topic does not store max message bytes info,
// the check of parameter fails.
// It is larger than the value of broker.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() + 1024
err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*Please make sure `max-message-bytes` not greater than topic's `max.message.bytes`.*",
)
config.MaxMessageBytes = defaultMaxMessageBytes + 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)
}

func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
Expand Down
Loading