Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4036
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Dec 24, 2021
1 parent da58b8c commit 807c040
Show file tree
Hide file tree
Showing 9 changed files with 678 additions and 9 deletions.
18 changes: 14 additions & 4 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type CraftEventBatchEncoder struct {
messageBuf []*MQMessage

// configs
maxMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int

allocator *craft.SliceAllocator
}
Expand All @@ -51,7 +51,7 @@ func (e *CraftEventBatchEncoder) flush() {
// AppendRowChangedEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if size > e.maxMessageSize || rows >= e.maxBatchSize {
if size > e.maxMessageBytes || rows >= e.maxBatchSize {
e.flush()
}
return EncoderNoOperation, nil
Expand Down Expand Up @@ -96,17 +96,27 @@ func (e *CraftEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
<<<<<<< HEAD
=======

e.maxMessageBytes = config.DefaultMaxMessageBytes
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
e.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
} else {
e.maxMessageSize = DefaultMaxMessageBytes
}
<<<<<<< HEAD

if e.maxMessageSize <= 0 || e.maxMessageSize > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageSize))
=======
if e.maxMessageBytes <= 0 || e.maxMessageBytes > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageBytes))
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/codec/craft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -152,7 +153,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)
Expand All @@ -166,7 +167,7 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16))
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.NotNil)
Expand Down
41 changes: 39 additions & 2 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -343,13 +341,23 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
<<<<<<< HEAD
maxKafkaMessageSize int
maxBatchSize int
}

// GetMaxKafkaMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int {
return d.maxKafkaMessageSize
=======
maxMessageBytes int
maxBatchSize int
}

// GetMaxMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageSize() int {
return d.maxMessageBytes
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -428,15 +436,25 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
<<<<<<< HEAD
if length > d.maxKafkaMessageSize {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
=======
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
<<<<<<< HEAD
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
=======
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -454,10 +472,17 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table

<<<<<<< HEAD
if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
=======
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -575,17 +600,29 @@ func (d *JSONEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
<<<<<<< HEAD
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
=======

d.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes
}
<<<<<<< HEAD

if d.maxKafkaMessageSize <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize))
=======
if d.maxMessageBytes <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
21 changes: 21 additions & 0 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -203,7 +204,11 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
<<<<<<< HEAD
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
=======
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -214,12 +219,20 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
<<<<<<< HEAD
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)
=======
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
<<<<<<< HEAD
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)
=======
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -230,12 +243,20 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
<<<<<<< HEAD
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
=======
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
<<<<<<< HEAD
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
=======
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
>>>>>>> 166fff003 (sink(ticdc): set max-message-bytes default to 10m (#4036))
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down
Loading

0 comments on commit 807c040

Please sign in to comment.