Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

feature: remove metrics when stopping task #575

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/syncer"
)

var (
taskState = prometheus.NewGaugeVec(
taskState = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "worker",
Expand Down Expand Up @@ -87,3 +88,7 @@ func InitStatus(lis net.Listener) {
log.L().Error("fail to start status server return", log.ShortError(err))
}
}

func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
1 change: 1 addition & 0 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func (st *SubTask) Result() *pb.ProcessResult {
// Close stops the sub task
func (st *SubTask) Close() {
st.l.Info("closing")
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name)
if st.cancel == nil {
st.l.Info("not run yet, no need to close")
return
Expand Down
1 change: 1 addition & 0 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {

// Close implements Unit.Close
func (m *Dumpling) Close() {
m.removeLabelValuesWithTaskInMetrics(m.cfg.Name)
if m.closed.Get() {
return
}
Expand Down
8 changes: 7 additions & 1 deletion dumpling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package dumpling

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/dm/pkg/metricsproxy"
)

var (
// should alert
dumplingExitWithErrorCounter = prometheus.NewCounterVec(
dumplingExitWithErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "dumpling",
Expand All @@ -32,3 +34,7 @@ var (
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(dumplingExitWithErrorCounter)
}

func (m *Dumpling) removeLabelValuesWithTaskInMetrics(task string) {
dumplingExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ func (l *Loader) loadFinishedSize() {
func (l *Loader) Close() {
l.Lock()
defer l.Unlock()
l.removeLabelValuesWithTaskInMetrics(l.cfg.Name)
if l.isClosed() {
return
}
Expand Down
29 changes: 21 additions & 8 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ package loader

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/dm/pkg/metricsproxy"
)

var (
// should error
tidbExecutionErrorCounter = prometheus.NewCounterVec(
tidbExecutionErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "tidb_execution_error",
Help: "Total count of tidb execution errors",
}, []string{"task"})

queryHistogram = prometheus.NewHistogramVec(
queryHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -36,7 +38,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
}, []string{"task"})

txnHistogram = prometheus.NewHistogramVec(
txnHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -45,31 +47,31 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
}, []string{"task"})

dataFileGauge = prometheus.NewGaugeVec(
dataFileGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_file_gauge",
Help: "data files in total",
}, []string{"task"})

tableGauge = prometheus.NewGaugeVec(
tableGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "table_gauge",
Help: "tables in total",
}, []string{"task"})

dataSizeGauge = prometheus.NewGaugeVec(
dataSizeGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_size_gauge",
Help: "data size in total",
}, []string{"task"})

progressGauge = prometheus.NewGaugeVec(
progressGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -78,7 +80,7 @@ var (
}, []string{"task"})

// should alert
loaderExitWithErrorCounter = prometheus.NewCounterVec(
loaderExitWithErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -98,3 +100,14 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(progressGauge)
registry.MustRegister(loaderExitWithErrorCounter)
}

func (m *Loader) removeLabelValuesWithTaskInMetrics(task string) {
tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
progressGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
8 changes: 7 additions & 1 deletion mydumper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package mydumper

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/dm/pkg/metricsproxy"
)

var (
// should alert
mydumperExitWithErrorCounter = prometheus.NewCounterVec(
mydumperExitWithErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "mydumper",
Expand All @@ -32,3 +34,7 @@ var (
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(mydumperExitWithErrorCounter)
}

func (m *Mydumper) removeLabelValuesWithTaskInMetrics(task string) {
mydumperExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
1 change: 1 addition & 0 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {

// Close implements Unit.Close
func (m *Mydumper) Close() {
m.removeLabelValuesWithTaskInMetrics(m.cfg.Name)
if m.closed.Get() {
return
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/metricsproxy/counterVec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metricsproxy
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/prometheus/client_golang/prometheus"
)

// CounterVecProxy to proxy prometheus.CounterVec
type CounterVecProxy struct {
LabelNames []string
Labels map[string]map[string]string
*prometheus.CounterVec
}

// NewCounterVec creates a new CounterVec based on the provided CounterOpts and
// partitioned by the given label names.
func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *CounterVecProxy {
return &CounterVecProxy{
LabelNames: labelNames,
Labels: make(map[string]map[string]string, 0),
CounterVec: prometheus.NewCounterVec(opts, labelNames),
}
}

// WithLabelValues works as GetMetricWithLabelValues, but panics where
// GetMetricWithLabelValues would have returned an error. Not returning an
// error allows shortcuts like
// myVec.WithLabelValues("404", "GET").Add(42)
func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
if len(lvs) > 0 {
labels := make(map[string]string, len(lvs))
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
noteLabels(c, labels)
}
return c.CounterVec.WithLabelValues(lvs...)
}

// With works as GetMetricWith, but panics where GetMetricWithLabels would have
// returned an error. Not returning an error allows shortcuts like
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter {
if len(labels) > 0 {
noteLabels(c, labels)
}

return c.CounterVec.With(labels)
}

// DeleteAllAboutLabels Remove all labelsValue with these labels
func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}

return findAndDeleteLabels(c, labels)
}

// GetLabels to support get CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) GetLabels() map[string]map[string]string {
return c.Labels
}

// vecDelete to support delete CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) vecDelete(labels prometheus.Labels) bool {
return c.CounterVec.Delete(labels)
}
41 changes: 41 additions & 0 deletions pkg/metricsproxy/counterVec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package metricsproxy

import (
"math/rand"

. "github.com/pingcap/check"

"github.com/prometheus/client_golang/prometheus"
)

func (t *testMetricsProxySuite) TestCounterVecProxy(c *C) {
for _, oneCase := range testCases {
counter := NewCounterVec(prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "metricsProxy",
Name: "Test_Counter",
Help: "dm counter metrics proxy test",
ConstLabels: nil,
}, oneCase.LabelsNames)
for _, aArgs := range oneCase.AddArgs {
if rand.Intn(199)%2 == 0 {
counter.WithLabelValues(aArgs...).Add(float64(rand.Intn(199)))
} else {
labels := make(prometheus.Labels, 0)
for k, labelName := range oneCase.LabelsNames {
labels[labelName] = aArgs[k]
}
counter.With(labels)
}
}
for _, dArgs := range oneCase.DeleteArgs {
counter.DeleteAllAboutLabels(dArgs)
}

cOutput := make(chan prometheus.Metric, len(oneCase.AddArgs)*3)

counter.Collect(cOutput)

c.Assert(len(cOutput), Equals, oneCase.WantResLength)
}
}
67 changes: 67 additions & 0 deletions pkg/metricsproxy/gaugeVec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metricsproxy

import (
"github.com/prometheus/client_golang/prometheus"
)

// GaugeVecProxy to proxy prometheus.GaugeVec
type GaugeVecProxy struct {
LabelNames []string
Labels map[string]map[string]string
*prometheus.GaugeVec
}

// NewGaugeVec creates a new GaugeVec based on the provided GaugeOpts and
// partitioned by the given label names.
func NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *GaugeVecProxy {
return &GaugeVecProxy{
LabelNames: labelNames,
Labels: make(map[string]map[string]string, 0),
GaugeVec: prometheus.NewGaugeVec(opts, labelNames),
}
}

// WithLabelValues works as GetMetricWithLabelValues, but panics where
// GetMetricWithLabelValues would have returned an error. Not returning an
// error allows shortcuts like
// myVec.WithLabelValues("404", "GET").Add(42)
func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge {
if len(lvs) > 0 {
labels := make(map[string]string, len(lvs))
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
noteLabels(c, labels)
}
return c.GaugeVec.WithLabelValues(lvs...)
}

// With works as GetMetricWith, but panics where GetMetricWithLabels would have
// returned an error. Not returning an error allows shortcuts like
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *GaugeVecProxy) With(labels prometheus.Labels) prometheus.Gauge {
if len(labels) > 0 {
noteLabels(c, labels)
}

return c.GaugeVec.With(labels)
}

// DeleteAllAboutLabels Remove all labelsValue with these labels
func (c *GaugeVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}

return findAndDeleteLabels(c, labels)
}

// GetLabels to support get GaugeVecProxy's Labels when you use Proxy object
func (c *GaugeVecProxy) GetLabels() map[string]map[string]string {
return c.Labels
}

// vecDelete to support delete GaugeVecProxy's Labels when you use Proxy object
func (c *GaugeVecProxy) vecDelete(labels prometheus.Labels) bool {
return c.GaugeVec.Delete(labels)
}
Loading