From 4496f83c9aafcb93037b3ac3eabf2dd3d1596df3 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 3 Jan 2022 16:21:35 +0800 Subject: [PATCH 1/6] fix canal json sink write row count metrics. --- cdc/sink/codec/canal_flat.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index b369215c709..98bcff8fa55 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -366,7 +366,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } - ret[i] = NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) + m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) + m.IncRowsCount() + ret[i] = m } c.resolvedBuf = c.resolvedBuf[0:0] return ret From 03e004c1c4f729e77ec563b640b5de7fd664d51c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 3 Jan 2022 18:45:39 +0800 Subject: [PATCH 2/6] also check rowsCount when Building the message. --- cdc/sink/codec/json_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index d7f1499ec30..4f8eb550607 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -137,6 +137,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1) + c.Assert(res[0], check.Equals, len(cs)) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) From 6422f8b3f5b901244e85db89885af39931f9e774 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 3 Jan 2022 18:57:59 +0800 Subject: [PATCH 3/6] refine the comment about --- cdc/sink/codec/canal_flat_test.go | 2 ++ cdc/sink/codec/interface.go | 17 +++++++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index 25fa8e7aac4..fb968f2b2ac 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -318,6 +318,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) { c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved)) for j := range msgs { + c.Assert(msgs[j].GetRowsCount(), check.Equals, 1) + var msg canalFlatMessage err := json.Unmarshal(msgs[j].Value, &msg) c.Assert(err, check.IsNil) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 996eb4d2d2f..7f4d1883e03 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -57,14 +57,15 @@ type EventBatchEncoder interface { // MQMessage represents an MQ message to the mqSink type MQMessage struct { - Key []byte - Value []byte - Ts uint64 // reserved for possible output sorting - Schema *string // schema - Table *string // table - Type model.MqMessageType // type - Protocol config.Protocol // protocol - rowsCount int // rows in one MQ Message + Key []byte + Value []byte + Ts uint64 // reserved for possible output sorting + Schema *string // schema + Table *string // table + Type model.MqMessageType // type + Protocol config.Protocol // protocol + // rows in one MQ Message, once a `Row Changed Event` message attached into the `Value`, this value should be incremented. + rowsCount int } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. From 72a109c18d2f0f43fae0d021ac86830629b3816a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 3 Jan 2022 22:11:53 +0800 Subject: [PATCH 4/6] refine avro and canal. --- cdc/sink/codec/avro.go | 1 + cdc/sink/codec/canal.go | 4 +++- cdc/sink/codec/interface.go | 6 ++++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index a0a962acb12..13fbcf8fa02 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -123,6 +123,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) } mqMessage.Key = evlp + mqMessage.IncRowsCount() a.resultBuf = append(a.resultBuf, mqMessage) return EncoderNeedAsyncWrite, nil diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 8e3ed61ecfb..20fdfa363ae 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -458,7 +458,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, // Build implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) Build() []*MQMessage { - if len(d.messages.Messages) == 0 { + rowCount := len(d.messages.Messages) + if rowCount == 0 { return nil } @@ -472,6 +473,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { log.Panic("Error when serializing Canal packet", zap.Error(err)) } ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(rowCount) d.messages.Reset() d.resetPacket() return []*MQMessage{ret} diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 7f4d1883e03..e43109fe58c 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -101,11 +101,13 @@ func (m *MQMessage) IncRowsCount() { } func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { - return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) + result := NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) + return result } func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { - return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) + result := NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) + return result } // NewMQMessage should be used when creating a MQMessage struct. From 543bd4d8932573f45067142f640af79a31b0c579 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 4 Jan 2022 11:13:17 +0800 Subject: [PATCH 5/6] refine the code. --- cdc/sink/codec/canal_test.go | 1 + cdc/sink/codec/interface.go | 23 ++++++++++------------- cdc/sink/codec/json_test.go | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index 8b6354a67dd..c25b3cb9800 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -99,6 +99,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) { c.Assert(res, check.HasLen, 1) c.Assert(res[0].Key, check.IsNil) c.Assert(len(res[0].Value), check.Equals, size) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) packet := &canal.Packet{} err := proto.Unmarshal(res[0].Value, packet) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index e43109fe58c..996eb4d2d2f 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -57,15 +57,14 @@ type EventBatchEncoder interface { // MQMessage represents an MQ message to the mqSink type MQMessage struct { - Key []byte - Value []byte - Ts uint64 // reserved for possible output sorting - Schema *string // schema - Table *string // table - Type model.MqMessageType // type - Protocol config.Protocol // protocol - // rows in one MQ Message, once a `Row Changed Event` message attached into the `Value`, this value should be incremented. - rowsCount int + Key []byte + Value []byte + Ts uint64 // reserved for possible output sorting + Schema *string // schema + Table *string // table + Type model.MqMessageType // type + Protocol config.Protocol // protocol + rowsCount int // rows in one MQ Message } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. @@ -101,13 +100,11 @@ func (m *MQMessage) IncRowsCount() { } func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { - result := NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) - return result + return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) } func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { - result := NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) - return result + return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) } // NewMQMessage should be used when creating a MQMessage struct. diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 4f8eb550607..387990d44a3 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -137,7 +137,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1) - c.Assert(res[0], check.Equals, len(cs)) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) From 56df90295efe7c91ed8bb605a3075de681924b03 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 4 Jan 2022 11:23:16 +0800 Subject: [PATCH 6/6] fix maxwell. --- cdc/sink/codec/maxwell.go | 1 + cdc/sink/codec/maxwell_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index a85d5ce3678..42f54aafc10 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -298,6 +298,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { } ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(d.batchSize) d.Reset() return []*MQMessage{ret} } diff --git a/cdc/sink/codec/maxwell_test.go b/cdc/sink/codec/maxwell_test.go index 5e6fea4a58e..1f98bedac44 100644 --- a/cdc/sink/codec/maxwell_test.go +++ b/cdc/sink/codec/maxwell_test.go @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func() continue } c.Assert(messages, check.HasLen, 1) + c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs)) c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size) }