From 87115bc41f43b141b87858db7b32233c194f6062 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 17 Jan 2022 17:09:42 +0800 Subject: [PATCH] ticdc/mq: accurately demonstrate txn_batch_size metric for MQ sink (#3609) (#3817) * fix the txn_batch_size metric inaccuracy bug when the sink target is MQ * address comments * add comments for exported functions * fix the compiling problem * fix conflicts. * fix comment. Co-authored-by: zhaoxinyu Co-authored-by: 3AceShowHand Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> --- cdc/sink/codec/craft.go | 5 +++- cdc/sink/codec/craft/model.go | 5 ++++ cdc/sink/codec/interface.go | 45 ++++++++++++++++++++++++----------- cdc/sink/codec/json.go | 1 + cdc/sink/mq.go | 5 ++-- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go index 0952664c2aa..270f669d6c6 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/codec/craft.go @@ -45,7 +45,10 @@ func (e *CraftEventBatchEncoder) flush() { ts := headers.GetTs(0) schema := headers.GetSchema(0) table := headers.GetTable(0) - e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)) + rowsCnt := e.rowChangedBuffer.RowsCount() + mqMessage := NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table) + mqMessage.SetRowsCount(rowsCnt) + e.messageBuf = append(e.messageBuf, mqMessage) } // AppendRowChangedEvent implements the EventBatchEncoder interface diff --git a/cdc/sink/codec/craft/model.go b/cdc/sink/codec/craft/model.go index 9854d6be456..823f96e642a 100644 --- a/cdc/sink/codec/craft/model.go +++ b/cdc/sink/codec/craft/model.go @@ -498,6 +498,11 @@ func (b *RowChangedEventBuffer) Size() int { return b.estimatedSize } +// RowsCount return Number of rows batched in this buffer +func (b *RowChangedEventBuffer) RowsCount() int { + return b.eventsCount +} + // GetHeaders returns headers of buffer func (b *RowChangedEventBuffer) GetHeaders() *Headers { return b.headers diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index dc1d86d0199..33a3457dcad 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -55,13 +55,14 @@ type EventBatchEncoder interface { // MQMessage represents an MQ message to the mqSink type MQMessage struct { - Key []byte - Value []byte - Ts uint64 // reserved for possible output sorting - Schema *string // schema - Table *string // table - Type model.MqMessageType // type - Protocol Protocol // protocol + Key []byte + Value []byte + Ts uint64 // reserved for possible output sorting + Schema *string // schema + Table *string // table + Type model.MqMessageType // type + Protocol Protocol // protocol + rowsCount int // rows in one MQ Message } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. @@ -81,6 +82,21 @@ func (m *MQMessage) PhysicalTime() time.Time { return oracle.GetTimeFromTS(m.Ts) } +// GetRowsCount returns the number of rows batched in one MQMessage +func (m *MQMessage) GetRowsCount() int { + return m.rowsCount +} + +// SetRowsCount set the number of rows +func (m *MQMessage) SetRowsCount(cnt int) { + m.rowsCount = cnt +} + +// IncRowsCount increase the number of rows +func (m *MQMessage) IncRowsCount() { + m.rowsCount++ +} + func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) } @@ -93,13 +109,14 @@ func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessa // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage { ret := &MQMessage{ - Key: nil, - Value: nil, - Ts: ts, - Schema: schema, - Table: table, - Type: ty, - Protocol: proto, + Key: nil, + Value: nil, + Ts: ts, + Schema: schema, + Table: table, + Type: ty, + Protocol: proto, + rowsCount: 0, } if key != nil { diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index e02af99df8e..d4e61faad6f 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -453,6 +453,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) message.Ts = e.CommitTs message.Schema = &e.Table.Schema message.Table = &e.Table.Table + message.IncRowsCount() if message.Length() > d.maxKafkaMessageSize { // `len(d.messageBuf) == 1` is implied diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 999a1402c85..41c583ccb1b 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -299,8 +299,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { flushToProducer := func(op codec.EncoderResult) error { return k.statistics.RecordBatchExecution(func() (int, error) { messages := encoder.Build() - thisBatchSize := len(messages) - if thisBatchSize == 0 { + thisBatchSize := 0 + if len(messages) == 0 { return 0, nil } @@ -309,6 +309,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { if err != nil { return 0, err } + thisBatchSize += msg.GetRowsCount() } if op == codec.EncoderNeedSyncWrite {