From eaf4c5cd020d3bc7caf1e97239f594f619e77303 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 17 Jan 2022 17:07:45 +0800 Subject: [PATCH] ticdc/mq: accurately demonstrate txn_batch_size metric for MQ sink (#3609) (#3818) --- cdc/sink/codec/interface.go | 45 +++++++++++++++++++++++++------------ cdc/sink/codec/json.go | 1 + cdc/sink/mq.go | 5 +++-- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index f58a91d39f7..3d7fe225980 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -55,13 +55,14 @@ type EventBatchEncoder interface { // MQMessage represents an MQ message to the mqSink type MQMessage struct { - Key []byte - Value []byte - Ts uint64 // reserved for possible output sorting - Schema *string // schema - Table *string // table - Type model.MqMessageType // type - Protocol Protocol // protocol + Key []byte + Value []byte + Ts uint64 // reserved for possible output sorting + Schema *string // schema + Table *string // table + Type model.MqMessageType // type + Protocol Protocol // protocol + rowsCount int // rows in one MQ Message } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. @@ -81,6 +82,21 @@ func (m *MQMessage) PhysicalTime() time.Time { return oracle.GetTimeFromTS(m.Ts) } +// GetRowsCount returns the number of rows batched in one MQMessage +func (m *MQMessage) GetRowsCount() int { + return m.rowsCount +} + +// SetRowsCount set the number of rows +func (m *MQMessage) SetRowsCount(cnt int) { + m.rowsCount = cnt +} + +// IncRowsCount increase the number of rows +func (m *MQMessage) IncRowsCount() { + m.rowsCount++ +} + func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) } @@ -93,13 +109,14 @@ func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessa // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage { ret := &MQMessage{ - Key: nil, - Value: nil, - Ts: ts, - Schema: schema, - Table: table, - Type: ty, - Protocol: proto, + Key: nil, + Value: nil, + Ts: ts, + Schema: schema, + Table: table, + Type: ty, + Protocol: proto, + rowsCount: 0, } if key != nil { diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 9e658ba3412..71510fab3e8 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -425,6 +425,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) message.Ts = e.CommitTs message.Schema = &e.Table.Schema message.Table = &e.Table.Table + message.IncRowsCount() if message.Length() > d.maxMessageBytes { // `len(d.messageBuf) == 1` is implied diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 710b4c4c1bf..deb13f32988 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -299,8 +299,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { flushToProducer := func(op codec.EncoderResult) error { return k.statistics.RecordBatchExecution(func() (int, error) { messages := encoder.Build() - thisBatchSize := len(messages) - if thisBatchSize == 0 { + thisBatchSize := 0 + if len(messages) == 0 { return 0, nil } @@ -309,6 +309,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { if err != nil { return 0, err } + thisBatchSize += msg.GetRowsCount() } if op == codec.EncoderNeedSyncWrite {