From bcdf2bd324bede69e014443770f227f96f75f334 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 31 Jan 2024 10:41:00 +0800 Subject: [PATCH] fix metrics --- cdc/owner/changefeed.go | 15 +++++++++++++-- cdc/owner/metrics.go | 8 ++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ebd161c1c85..f04f83949b0 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/filter" pfilter "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" @@ -122,6 +121,9 @@ type changefeed struct { metricsChangefeedBarrierTsGauge prometheus.Gauge metricsChangefeedTickDuration prometheus.Observer + metricsChangefeedCreateTimeGuage prometheus.Gauge + metricsChangefeedRestartTimeGauge prometheus.Gauge + newDDLPuller func(ctx context.Context, replicaConfig *config.ReplicaConfig, up *upstream.Upstream, @@ -536,7 +538,7 @@ LOOP2: } c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) - filter, err := filter.NewFilter(c.state.Info.Config, "") + filter, err := pfilter.NewFilter(c.state.Info.Config, "") if err != nil { return errors.Trace(err) } @@ -636,6 +638,8 @@ LOOP2: c.initMetrics() c.initialized = true + c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(c.state.Info.CreateTime))) + c.metricsChangefeedRestartTimeGauge.Set(float64(oracle.GetPhysical(time.Now()))) log.Info("changefeed initialized", zap.String("namespace", c.state.ID.Namespace), zap.String("changefeed", c.state.ID.ID), @@ -667,6 +671,11 @@ func (c *changefeed) initMetrics() { WithLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedTickDuration = changefeedTickDuration. WithLabelValues(c.id.Namespace, c.id.ID) + + c.metricsChangefeedCreateTimeGuage = changefeedStartTimeGauge. + WithLabelValues(c.id.Namespace, c.id.ID, "create") + c.metricsChangefeedRestartTimeGauge = changefeedStartTimeGauge. + WithLabelValues(c.id.Namespace, c.id.ID, "restart") } // releaseResources is idempotent. @@ -744,6 +753,8 @@ func (c *changefeed) cleanupMetrics() { if c.isRemoved { changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID, "create") + changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID, "restart") } } diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 9772be12313..f9fc39050ea 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -120,6 +120,13 @@ var ( Name: "ignored_ddl_event_count", Help: "The total count of ddl events that are ignored in changefeed.", }, []string{"namespace", "changefeed"}) + changefeedStartTimeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_start_time", + Help: "The start time of changefeeds", + }, []string{"namespace", "changefeed", "type"}) ) const ( @@ -147,6 +154,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedTickDuration) registry.MustRegister(changefeedCloseDuration) registry.MustRegister(changefeedIgnoredDDLEventCounter) + registry.MustRegister(changefeedStartTimeGauge) } // lagBucket returns the lag buckets for prometheus metric