diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 734d0fedccc..fa29fec9e84 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -61,7 +61,7 @@ var ( Subsystem: "owner", Name: "maintain_table_num", Help: "number of replicated tables maintained in owner", - }, []string{"changefeed", "type"}) + }, []string{"changefeed", "capture", "type"}) changefeedStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index d428a053a06..a4224d67505 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -352,23 +352,29 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) { pendingCounts := infoProvider.GetPendingTableCounts() for captureID, info := range o.captures { - ownerMaintainTableNumGauge.WithLabelValues( - cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID])) - ownerMaintainTableNumGauge.WithLabelValues( - cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID])) + ownerMaintainTableNumGauge. + WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeTotal). + Set(float64(totalCounts[captureID])) + ownerMaintainTableNumGauge. + WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeWip). + Set(float64(pendingCounts[captureID])) } } return } for changefeedID, changefeedState := range state.Changefeeds { - for captureID := range state.Captures { + for captureID, captureInfo := range state.Captures { taskStatus, exist := changefeedState.TaskStatuses[captureID] if !exist { continue } - ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables))) - ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeWip).Set(float64(len(taskStatus.Operation))) + ownerMaintainTableNumGauge. + WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal). + Set(float64(len(taskStatus.Tables))) + ownerMaintainTableNumGauge. + WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip). + Set(float64(len(taskStatus.Operation))) if changefeedState.Info != nil { changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt())) } diff --git a/cdc/processor/pipeline/metrics.go b/cdc/processor/pipeline/metrics.go index 12775288cff..6a593278a5b 100644 --- a/cdc/processor/pipeline/metrics.go +++ b/cdc/processor/pipeline/metrics.go @@ -17,26 +17,16 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - txnCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "txn_count", - Help: "txn count received/executed by this processor", - }, []string{"type", "changefeed"}) - tableMemoryHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "table_memory_consumption", - Help: "estimated memory consumption for a table after the sorter", - Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10), - }, []string{"changefeed"}) -) +var tableMemoryHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "processor", + Name: "table_memory_consumption", + Help: "estimated memory consumption for a table after the sorter", + Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10), + }, []string{"changefeed"}) // InitMetrics registers all metrics used in processor func InitMetrics(registry *prometheus.Registry) { - registry.MustRegister(txnCounter) registry.MustRegister(tableMemoryHistogram) } diff --git a/cdc/sorter/leveldb/metrics.go b/cdc/sorter/leveldb/metrics.go index dfdcf815679..3fb0c3a3558 100644 --- a/cdc/sorter/leveldb/metrics.go +++ b/cdc/sorter/leveldb/metrics.go @@ -49,13 +49,6 @@ var ( Help: "Bucketed histogram of db sorter iterator read duration", Buckets: prometheus.ExponentialBuckets(0.004, 2.0, 20), }, []string{"id", "call"}) - - sorterCleanupKVCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "db_cleanup_kv_total", - Help: "The total number of cleaned up kv entries", - }, []string{"id"}) ) // InitMetrics registers all metrics in this file @@ -64,5 +57,4 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(sorterCompactDurationHistogram) registry.MustRegister(sorterWriteBytesHistogram) registry.MustRegister(sorterIterReadDurationHistogram) - registry.MustRegister(sorterCleanupKVCounter) } diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 076b2d04862..c21ae2e056a 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -1322,19 +1322,19 @@ ], "targets": [ { - "expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"total\"}) by (instance)", + "expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"total\"}) by (capture)", "format": "time_series", "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-total", + "legendFormat": "{{capture}}-total", "refId": "A" }, { - "expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"wip\"}) by (instance)", + "expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"wip\"}) by (capture)", "format": "time_series", "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-wip", + "legendFormat": "{{capture}}-wip", "refId": "B" } ],