From 821efbe5ca969b7718638ed7d385b3e67013a9f2 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 4 Jan 2022 14:14:35 +0800 Subject: [PATCH] This is an automated cherry-pick of #4192 Signed-off-by: ti-chi-bot --- cdc/sink/codec/avro.go | 1 + cdc/sink/codec/canal.go | 8 +++++++- cdc/sink/codec/canal_flat.go | 6 ++++++ cdc/sink/codec/canal_flat_test.go | 2 ++ cdc/sink/codec/canal_test.go | 1 + cdc/sink/codec/json_test.go | 1 + cdc/sink/codec/maxwell.go | 5 +++++ cdc/sink/codec/maxwell_test.go | 1 + 8 files changed, 24 insertions(+), 1 deletion(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index bb06324c88c..d4b3cb7f0c1 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -120,6 +120,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) } mqMessage.Key = evlp + mqMessage.IncRowsCount() a.resultBuf = append(a.resultBuf, mqMessage) return EncoderNeedAsyncWrite, nil diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 1e058da4b95..90894625846 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -377,7 +377,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, // Build implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) Build() []*MQMessage { - if len(d.messages.Messages) == 0 { + rowCount := len(d.messages.Messages) + if rowCount == 0 { return nil } @@ -390,7 +391,12 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { if err != nil { log.Panic("Error when serializing Canal packet", zap.Error(err)) } +<<<<<<< HEAD ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) +======= + ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(rowCount) +>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) d.messages.Reset() d.resetPacket() return []*MQMessage{ret} diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 151ecece2fb..52587fb0214 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -218,7 +218,13 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } +<<<<<<< HEAD ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) +======= + m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) + m.IncRowsCount() + ret[i] = m +>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) } c.resolvedBuf = c.resolvedBuf[0:0] return ret diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index 1ca30741756..6813a231983 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -124,6 +124,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) { c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved)) for j := range msgs { + c.Assert(msgs[j].GetRowsCount(), check.Equals, 1) + var msg canalFlatMessage err := json.Unmarshal(msgs[j].Value, &msg) c.Assert(err, check.IsNil) diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index 55c624a81f3..c2853ddd52c 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -102,6 +102,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) { c.Assert(res, check.HasLen, 1) c.Assert(res[0].Key, check.IsNil) c.Assert(len(res[0].Value), check.Equals, size) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) packet := &canal.Packet{} err := proto.Unmarshal(res[0].Value, packet) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 495783ac731..4751a2cb1ab 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -160,6 +160,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 2b495e6986d..c59d48676db 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -277,7 +277,12 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return nil } +<<<<<<< HEAD ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) +======= + ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(d.batchSize) +>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) d.Reset() return []*MQMessage{ret} } diff --git a/cdc/sink/codec/maxwell_test.go b/cdc/sink/codec/maxwell_test.go index 5e6fea4a58e..1f98bedac44 100644 --- a/cdc/sink/codec/maxwell_test.go +++ b/cdc/sink/codec/maxwell_test.go @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func() continue } c.Assert(messages, check.HasLen, 1) + c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs)) c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size) }