Skip to content

Commit

Permalink
sink(ticdc): set max-message-bytes default to 10m (#4036) (#4059)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 28, 2022
1 parent 54fe214 commit 3429c71
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 76 deletions.
45 changes: 22 additions & 23 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ import (
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/model"
)

const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -317,13 +314,13 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int
}

// GetMaxKafkaMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int {
return d.maxKafkaMessageSize
// GetMaxMessageBytes is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int {
return d.maxMessageBytes
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -402,15 +399,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxKafkaMessageSize {
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -429,10 +426,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxKafkaMessageSize {
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -550,17 +547,19 @@ func (d *JSONEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes

maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.New("max-message-bytes not found"))
}

d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}

if d.maxKafkaMessageSize <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize))
if d.maxMessageBytes <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
48 changes: 38 additions & 10 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -225,10 +227,10 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -239,28 +241,50 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestSetParams(c *check.C) {
defer testleak.AfterTest(c)

opts := make(map[string]string)
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*max-message-bytes not found.*",
)

opts["max-message-bytes"] = "1"
encoder = NewJSONEventBatchEncoder()
err = encoder.SetParams(opts)
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)
jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down Expand Up @@ -308,9 +332,13 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
func (s *batchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)
err := encoder.SetParams(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"})
c.Assert(encoder, check.NotNil)
c.Assert(err, check.IsNil)

jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1048576)
testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Expand Down
4 changes: 1 addition & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

var protocol codec.Protocol
protocol.FromString(replicaConfig.Sink.Protocol)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
Expand Down Expand Up @@ -230,5 +230,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)
}
65 changes: 32 additions & 33 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewConfig() *Config {
return &Config{
Version: "2.4.0",
// MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker.
MaxMessageBytes: 1 * 1024 * 1024,
MaxMessageBytes: config.DefaultMaxMessageBytes,
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
Expand Down Expand Up @@ -387,7 +387,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error {
func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
Expand All @@ -410,20 +410,18 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
info, created := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
if protocol == codec.ProtocolDefault {
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
Expand All @@ -443,22 +441,22 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes` for the default open protocol.
if protocol == codec.ProtocolDefault {
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}
// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
Expand Down Expand Up @@ -487,16 +485,17 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}

if err := topicPreProcess(topic, protocol, config, cfg); err != nil {
if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
Expand Down
18 changes: 13 additions & 5 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
opts := make(map[string]string)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))
c.Assert(opts, check.HasKey, "max-message-bytes")
for i := 0; i < 100; i++ {
err = producer.SendMessage(ctx, &codec.MQMessage{
Key: []byte("test-key-1"),
Expand Down Expand Up @@ -275,11 +277,12 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {

cfg, err := newSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

config.BrokerEndpoints = []string{""}
cfg.Metadata.Retry.Max = 1

err = topicPreProcess(topic, codec.ProtocolDefault, config, cfg)
err = topicPreProcess(topic, config, cfg)
c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers)
}

Expand Down Expand Up @@ -347,8 +350,9 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
config.Version = "invalid"
config.BrokerEndpoints = []string{"127.0.0.1:1111"}
topic := "topic"
opts := make(map[string]string)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
_, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
_, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}
Expand Down Expand Up @@ -393,7 +397,9 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
}()

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
opts := make(map[string]string)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
c.Assert(opts, check.HasKey, "max-message-bytes")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
err := producer.Close()
Expand Down Expand Up @@ -458,7 +464,9 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
opts := make(map[string]string)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
c.Assert(opts, check.HasKey, "max-message-bytes")
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down
Loading

0 comments on commit 3429c71

Please sign in to comment.