Skip to content

Commit

Permalink
ticdc/mq: accurately demonstrate txn_batch_size metric for MQ sink (#…
Browse files Browse the repository at this point in the history
…3609) (#3817)

* fix the txn_batch_size metric inaccuracy bug when the sink target is MQ

* address comments

* add comments for exported functions

* fix the compiling problem

* fix conflicts.

* fix comment.

Co-authored-by: zhaoxinyu <[email protected]>
Co-authored-by: 3AceShowHand <[email protected]>
Co-authored-by: Ling Jin <[email protected]>
  • Loading branch information
4 people authored Jan 17, 2022
1 parent 433fdb9 commit 87115bc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
5 changes: 4 additions & 1 deletion cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func (e *CraftEventBatchEncoder) flush() {
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
rowsCnt := e.rowChangedBuffer.RowsCount()
mqMessage := NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)
mqMessage.SetRowsCount(rowsCnt)
e.messageBuf = append(e.messageBuf, mqMessage)
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/craft/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ func (b *RowChangedEventBuffer) Size() int {
return b.estimatedSize
}

// RowsCount return Number of rows batched in this buffer
func (b *RowChangedEventBuffer) RowsCount() int {
return b.eventsCount
}

// GetHeaders returns headers of buffer
func (b *RowChangedEventBuffer) GetHeaders() *Headers {
return b.headers
Expand Down
45 changes: 31 additions & 14 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ type EventBatchEncoder interface {

// MQMessage represents an MQ message to the mqSink
type MQMessage struct {
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
rowsCount int // rows in one MQ Message
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
Expand All @@ -81,6 +82,21 @@ func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

// GetRowsCount returns the number of rows batched in one MQMessage
func (m *MQMessage) GetRowsCount() int {
return m.rowsCount
}

// SetRowsCount set the number of rows
func (m *MQMessage) SetRowsCount(cnt int) {
m.rowsCount = cnt
}

// IncRowsCount increase the number of rows
func (m *MQMessage) IncRowsCount() {
m.rowsCount++
}

func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table)
}
Expand All @@ -93,13 +109,14 @@ func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessa
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
rowsCount: 0,
}

if key != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
flushToProducer := func(op codec.EncoderResult) error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := len(messages)
if thisBatchSize == 0 {
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
}

Expand All @@ -309,6 +309,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
if err != nil {
return 0, err
}
thisBatchSize += msg.GetRowsCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down

0 comments on commit 87115bc

Please sign in to comment.