Skip to content

Commit

Permalink
owner(ticdc): fix prometheus panic (pingcap#4759)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Mar 7, 2022
1 parent f113d36 commit ddd140a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
Subsystem: "owner",
Name: "maintain_table_num",
Help: "number of replicated tables maintained in owner",
}, []string{"changefeed", "type"})
}, []string{"changefeed", "capture", "type"})
changefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down
20 changes: 13 additions & 7 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,23 +352,29 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
pendingCounts := infoProvider.GetPendingTableCounts()

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeWip).
Set(float64(pendingCounts[captureID]))
}
}
return
}

for changefeedID, changefeedState := range state.Changefeeds {
for captureID := range state.Captures {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
if !exist {
continue
}
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeWip).Set(float64(len(taskStatus.Operation)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).
Set(float64(len(taskStatus.Operation)))
if changefeedState.Info != nil {
changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt()))
}
Expand Down
26 changes: 8 additions & 18 deletions cdc/processor/pipeline/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
txnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "txn_count",
Help: "txn count received/executed by this processor",
}, []string{"type", "changefeed"})
tableMemoryHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10),
}, []string{"changefeed"})
)
var tableMemoryHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10),
}, []string{"changefeed"})

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(txnCounter)
registry.MustRegister(tableMemoryHistogram)
}
8 changes: 0 additions & 8 deletions cdc/sorter/leveldb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ var (
Help: "Bucketed histogram of db sorter iterator read duration",
Buckets: prometheus.ExponentialBuckets(0.004, 2.0, 20),
}, []string{"id", "call"})

sorterCleanupKVCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "db_cleanup_kv_total",
Help: "The total number of cleaned up kv entries",
}, []string{"id"})
)

// InitMetrics registers all metrics in this file
Expand All @@ -64,5 +57,4 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sorterCompactDurationHistogram)
registry.MustRegister(sorterWriteBytesHistogram)
registry.MustRegister(sorterIterReadDurationHistogram)
registry.MustRegister(sorterCleanupKVCounter)
}
8 changes: 4 additions & 4 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1322,19 +1322,19 @@
],
"targets": [
{
"expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"total\"}) by (instance)",
"expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"total\"}) by (capture)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}-total",
"legendFormat": "{{capture}}-total",
"refId": "A"
},
{
"expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"wip\"}) by (instance)",
"expr": "sum(ticdc_owner_maintain_table_num{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\",type=\"wip\"}) by (capture)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}-wip",
"legendFormat": "{{capture}}-wip",
"refId": "B"
}
],
Expand Down

0 comments on commit ddd140a

Please sign in to comment.