From ae04769cc4ff1764def1cba3c21a71aa4661dc35 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 18 Jan 2022 14:39:49 +0800 Subject: [PATCH] metrics(cdc): fix mq sink write row count metrics. (#4192) (#4323) * fix the txn_batch_size metric inaccuracy bug when the sink target is MQ * address comments * add comments for exported functions * fix the compiling problem * workerpool: limit the rate to output deadlock warning (#3775) (#3795) * tests(ticdc): set up the sync diff output directory correctly (#3725) (#3741) * relay(dm): use binlog name comparison (#3710) (#3712) * dm/load: fix concurrent call Loader.Status (#3459) (#3468) * cdc/sorter: make unified sorter cgroup aware (#3436) (#3439) * tz (ticdc): fix timezone error (#3887) (#3906) * pkg,cdc: do not use log package (#3902) (#3940) * *: rename repo from pingcap/ticdc to pingcap/tiflow (#3959) * http_*: add log for http api and refine the err handle logic (#2997) (#3307) * etcd_worker: batch etcd patch (#3277) (#3389) * http_api (ticdc): check --cert-allowed-cn before add server common name (#3628) (#3882) * kvclient(ticdc): fix kvclient takes too long time to recover (#3612) (#3663) * owner: fix owner tick block http request (#3490) (#3530) * dm/syncer: use downstream PK/UK to generate DML (#3168) (#3256) * dep(dm): update go-mysql (#3914) (#3934) * dm/syncer: multiple rows use downstream schema (#3308) (#3953) * errorutil,sink,syncer: add errorutil to handle ignorable error (#3264) (#3995) * dm/worker: don't exit when failed to read checkpoint in relay (#3345) (#4005) * syncer(dm): use an early location to reset binlog and open safemode (#3860) * ticdc/owner: Fix ddl special comment syntax error (#3845) (#3978) * dm/scheduler: fix inconsistent of relay status (#3474) (#4009) * owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980) (#4007) (#4016) * config(ticdc): Fix old value configuration check for maxwell protocol (#3747) (#3783) * sink(ticdc): cherry pick sink bug fix to release 5.3 (#4083) * master(dm): clean and treat invalid load task (#4004) (#4145) * loader: fix wrong progress in query-status for loader (#4093) (#4143) close pingcap/tiflow#3252 * ticdc/processor: Fix backoff base delay misconfiguration (#3992) (#4028) * dm: load table structure from dump files (#3295) (#4163) * compactor: fix duplicate entry in safemode (#3432) (#3434) (#4088) * kv(ticdc): reduce eventfeed rate limited log (#4072) (#4111) close pingcap/tiflow#4006 * metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038) (#4104) * This is an automated cherry-pick of #4192 Signed-off-by: ti-chi-bot * retry(dm): align with tidb latest error message (#4172) (#4254) close pingcap/tiflow#4159, close pingcap/tiflow#4246 * owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838) (#3865) * redolog: add a precleanup process when s3 enable (#3525) (#3878) * ddl(dm): make skipped ddl pass `SplitDDL()` (#4176) (#4227) close pingcap/tiflow#4173 * cdc/sink: remove Initialize method from the sink interface (#3682) (#3765) Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> * http_api (ticdc): fix http api 'get processor' panic. (#4117) (#4123) close pingcap/tiflow#3840 * sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (#4084) (#4099) close pingcap/tiflow#4055 * cdc/sink: adjust kafka initialization logic (#3192) (#4162) * try fix conflicts. * This is an automated cherry-pick of #4192 Signed-off-by: ti-chi-bot * fix conflicts. * fix conflicts. Co-authored-by: zhaoxinyu Co-authored-by: amyangfei Co-authored-by: lance6716 Co-authored-by: sdojjy Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Co-authored-by: 3AceShowHand --- cdc/sink/codec/avro.go | 1 + cdc/sink/codec/canal.go | 4 +++- cdc/sink/codec/canal_flat.go | 4 +++- cdc/sink/codec/canal_flat_test.go | 2 ++ cdc/sink/codec/canal_test.go | 1 + cdc/sink/codec/json_test.go | 1 + cdc/sink/codec/maxwell.go | 1 + cdc/sink/codec/maxwell_test.go | 1 + 8 files changed, 13 insertions(+), 2 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index cb6ed9e9952..01a5170dfd6 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -122,6 +122,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) } mqMessage.Key = evlp + mqMessage.IncRowsCount() a.resultBuf = append(a.resultBuf, mqMessage) return EncoderNeedAsyncWrite, nil diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 089103d2f8d..bdd73e272fd 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -378,7 +378,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, // Build implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) Build() []*MQMessage { - if len(d.messages.Messages) == 0 { + rowCount := len(d.messages.Messages) + if rowCount == 0 { return nil } @@ -392,6 +393,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { log.Panic("Error when serializing Canal packet", zap.Error(err)) } ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(rowCount) d.messages.Reset() d.resetPacket() return []*MQMessage{ret} diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 93579556ecd..6f9fc4089fe 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -237,7 +237,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } - ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) + m := NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) + m.IncRowsCount() + ret[i] = m } c.resolvedBuf = c.resolvedBuf[0:0] return ret diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index 0d6cbb7531b..64f371fe235 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -123,6 +123,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) { c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved)) for j := range msgs { + c.Assert(msgs[j].GetRowsCount(), check.Equals, 1) + var msg canalFlatMessage err := json.Unmarshal(msgs[j].Value, &msg) c.Assert(err, check.IsNil) diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index bf8f0fa113d..d3755d1a2df 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -102,6 +102,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) { c.Assert(res, check.HasLen, 1) c.Assert(res[0].Key, check.IsNil) c.Assert(len(res[0].Value), check.Equals, size) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) packet := &canal.Packet{} err := proto.Unmarshal(res[0].Value, packet) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 7f02c564904..d1d2646e142 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -134,6 +134,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1) + c.Assert(res[0].GetRowsCount(), check.Equals, len(cs)) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index d6e54c72448..0a67cb939e9 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -297,6 +297,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { } ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) + ret.SetRowsCount(d.batchSize) d.Reset() return []*MQMessage{ret} } diff --git a/cdc/sink/codec/maxwell_test.go b/cdc/sink/codec/maxwell_test.go index 5e6fea4a58e..1f98bedac44 100644 --- a/cdc/sink/codec/maxwell_test.go +++ b/cdc/sink/codec/maxwell_test.go @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func() continue } c.Assert(messages, check.HasLen, 1) + c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs)) c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size) }