From a25ad08aa9bcc3c255f0607c41d113d1bbabd684 Mon Sep 17 00:00:00 2001 From: Amatist_Kurisu Date: Wed, 8 Apr 2020 10:41:44 +0800 Subject: [PATCH 1/3] feature: remove metrics when stopping task (#575) --- dm/worker/metrics.go | 9 +- dm/worker/subtask.go | 1 + loader/loader.go | 1 + loader/metrics.go | 32 ++++-- mydumper/metrics.go | 8 +- mydumper/mydumper.go | 2 + pkg/conn/baseconn.go | 6 +- pkg/conn/baseconn_test.go | 3 +- pkg/metricsproxy/countervec.go | 80 +++++++++++++++ pkg/metricsproxy/countervec_test.go | 56 +++++++++++ pkg/metricsproxy/gaugevec.go | 80 +++++++++++++++ pkg/metricsproxy/gaugevec_test.go | 56 +++++++++++ pkg/metricsproxy/histogramvec.go | 80 +++++++++++++++ pkg/metricsproxy/histogramvec_test.go | 56 +++++++++++ pkg/metricsproxy/proxy.go | 70 +++++++++++++ pkg/metricsproxy/proxy_test.go | 140 ++++++++++++++++++++++++++ pkg/metricsproxy/summaryvec.go | 84 ++++++++++++++++ pkg/metricsproxy/summaryvec_test.go | 56 +++++++++++ relay/metrics.go | 9 +- syncer/metrics.go | 67 ++++++++---- syncer/syncer.go | 2 + 21 files changed, 857 insertions(+), 41 deletions(-) create mode 100644 pkg/metricsproxy/countervec.go create mode 100644 pkg/metricsproxy/countervec_test.go create mode 100644 pkg/metricsproxy/gaugevec.go create mode 100644 pkg/metricsproxy/gaugevec_test.go create mode 100644 pkg/metricsproxy/histogramvec.go create mode 100644 pkg/metricsproxy/histogramvec_test.go create mode 100644 pkg/metricsproxy/proxy.go create mode 100644 pkg/metricsproxy/proxy_test.go create mode 100644 pkg/metricsproxy/summaryvec.go create mode 100644 pkg/metricsproxy/summaryvec_test.go diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index dee9613995..532e9f2ea6 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -18,20 +18,21 @@ import ( "net/http" "net/http/pprof" - "github.com/pingcap/dm/pkg/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/loader" "github.com/pingcap/dm/mydumper" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/metricsproxy" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay" "github.com/pingcap/dm/syncer" ) var ( - taskState = prometheus.NewGaugeVec( + taskState = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "worker", @@ -87,3 +88,7 @@ func InitStatus(lis net.Listener) { log.L().Error("fail to start status server return", log.ShortError(err)) } } + +func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) { + taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +} diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 32a8e774dd..66d24ce84c 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -389,6 +389,7 @@ func (st *SubTask) Close() { st.cancel() st.closeUnits() // close all un-closed units st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped) + st.removeLabelValuesWithTaskInMetrics(st.cfg.Name) st.wg.Wait() } diff --git a/loader/loader.go b/loader/loader.go index 885b3e5ca8..b079b70f34 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -621,6 +621,7 @@ func (l *Loader) Close() { l.logCtx.L().Error("close downstream DB error", log.ShortError(err)) } l.checkPoint.Close() + l.removeLabelValuesWithTaskInMetrics(l.cfg.Name) l.closed.Set(true) } diff --git a/loader/metrics.go b/loader/metrics.go index 959e6fd5d8..b1581472c9 100644 --- a/loader/metrics.go +++ b/loader/metrics.go @@ -15,11 +15,13 @@ package loader import ( "github.com/prometheus/client_golang/prometheus" + + "github.com/pingcap/dm/pkg/metricsproxy" ) var ( // should error - tidbExecutionErrorCounter = prometheus.NewCounterVec( + tidbExecutionErrorCounter = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "loader", @@ -27,7 +29,7 @@ var ( Help: "Total count of tidb execution errors", }, []string{"task"}) - queryHistogram = prometheus.NewHistogramVec( + queryHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "loader", @@ -36,7 +38,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - txnHistogram = prometheus.NewHistogramVec( + txnHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "loader", @@ -45,7 +47,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - stmtHistogram = prometheus.NewHistogramVec( + stmtHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "loader", @@ -54,7 +56,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) - dataFileGauge = prometheus.NewGaugeVec( + dataFileGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "loader", @@ -62,7 +64,7 @@ var ( Help: "data files in total", }, []string{"task"}) - tableGauge = prometheus.NewGaugeVec( + tableGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "loader", @@ -70,7 +72,7 @@ var ( Help: "tables in total", }, []string{"task"}) - dataSizeGauge = prometheus.NewGaugeVec( + dataSizeGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "loader", @@ -78,7 +80,7 @@ var ( Help: "data size in total", }, []string{"task"}) - progressGauge = prometheus.NewGaugeVec( + progressGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "loader", @@ -87,7 +89,7 @@ var ( }, []string{"task"}) // should alert - loaderExitWithErrorCounter = prometheus.NewCounterVec( + loaderExitWithErrorCounter = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "loader", @@ -108,3 +110,15 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(progressGauge) registry.MustRegister(loaderExitWithErrorCounter) } + +func (m *Loader) removeLabelValuesWithTaskInMetrics(task string) { + tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + progressGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + loaderExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +} diff --git a/mydumper/metrics.go b/mydumper/metrics.go index 8335777219..a313765482 100644 --- a/mydumper/metrics.go +++ b/mydumper/metrics.go @@ -15,11 +15,13 @@ package mydumper import ( "github.com/prometheus/client_golang/prometheus" + + "github.com/pingcap/dm/pkg/metricsproxy" ) var ( // should alert - mydumperExitWithErrorCounter = prometheus.NewCounterVec( + mydumperExitWithErrorCounter = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "mydumper", @@ -32,3 +34,7 @@ var ( func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(mydumperExitWithErrorCounter) } + +func (m *Mydumper) removeLabelValuesWithTaskInMetrics(task string) { + mydumperExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +} diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 2144062c77..91c34869df 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -183,6 +183,8 @@ func (m *Mydumper) Close() { if m.closed.Get() { return } + + m.removeLabelValuesWithTaskInMetrics(m.cfg.Name) // do nothing, external will cancel the command (if running) m.closed.Set(true) } diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index 901933d108..aded867d47 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -22,12 +22,12 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/failpoint" - "github.com/prometheus/client_golang/prometheus" gmysql "github.com/siddontang/go-mysql/mysql" "go.uber.org/zap" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/metricsproxy" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -116,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int // return // 1. failed: (the index of sqls executed error, error) // 2. succeed: (len(sqls), nil) -func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) { +func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) { // inject an error to trigger retry, this should be placed before the real execution of the SQL statement. failpoint.Inject("retryableError", func(val failpoint.Value) { if mark, ok := val.(string); ok { @@ -206,7 +206,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *pr // return // 1. failed: (the index of sqls executed error, error) // 2. succeed: (len(sqls), nil) -func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) { +func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, queries []string, args ...[]interface{}) (int, error) { return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...) } diff --git a/pkg/conn/baseconn_test.go b/pkg/conn/baseconn_test.go index 1f2a6407ff..f7e622e2be 100644 --- a/pkg/conn/baseconn_test.go +++ b/pkg/conn/baseconn_test.go @@ -18,6 +18,7 @@ import ( "testing" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/metricsproxy" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" @@ -36,7 +37,7 @@ type testBaseConnSuite struct { } var ( - testStmtHistogram = prometheus.NewHistogramVec( + testStmtHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "conn", diff --git a/pkg/metricsproxy/countervec.go b/pkg/metricsproxy/countervec.go new file mode 100644 index 0000000000..87ea599c72 --- /dev/null +++ b/pkg/metricsproxy/countervec.go @@ -0,0 +1,80 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// CounterVecProxy to proxy prometheus.CounterVec +type CounterVecProxy struct { + LabelNames []string + Labels map[string]map[string]string + *prometheus.CounterVec +} + +// NewCounterVec creates a new CounterVec based on the provided CounterOpts and +// partitioned by the given label names. +func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *CounterVecProxy { + return &CounterVecProxy{ + LabelNames: labelNames, + Labels: make(map[string]map[string]string, 0), + CounterVec: prometheus.NewCounterVec(opts, labelNames), + } +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Add(42) +func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter { + if len(lvs) > 0 { + labels := make(map[string]string, len(lvs)) + for index, label := range lvs { + labels[c.LabelNames[index]] = label + } + noteLabelsInMetricsProxy(c, labels) + } + return c.CounterVec.WithLabelValues(lvs...) +} + +// With works as GetMetricWith, but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) +func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter { + if len(labels) > 0 { + noteLabelsInMetricsProxy(c, labels) + } + + return c.CounterVec.With(labels) +} + +// DeleteAllAboutLabels Remove all labelsValue with these labels +func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { + if len(labels) == 0 { + return false + } + + return findAndDeleteLabelsInMetricsProxy(c, labels) +} + +// GetLabels to support get CounterVecProxy's Labels when you use Proxy object +func (c *CounterVecProxy) GetLabels() map[string]map[string]string { + return c.Labels +} + +// vecDelete to support delete CounterVecProxy's Labels when you use Proxy object +func (c *CounterVecProxy) vecDelete(labels prometheus.Labels) bool { + return c.CounterVec.Delete(labels) +} diff --git a/pkg/metricsproxy/countervec_test.go b/pkg/metricsproxy/countervec_test.go new file mode 100644 index 0000000000..e54feb2f18 --- /dev/null +++ b/pkg/metricsproxy/countervec_test.go @@ -0,0 +1,56 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "math/rand" + "time" + + . "github.com/pingcap/check" + + "github.com/prometheus/client_golang/prometheus" +) + +func (t *testMetricsProxySuite) TestCounterVecProxy(c *C) { + rand.Seed(time.Now().UnixNano()) + for _, oneCase := range testCases { + counter := NewCounterVec(prometheus.CounterOpts{ + Namespace: "dm", + Subsystem: "metricsProxy", + Name: "Test_Counter", + Help: "dm counter metrics proxy test", + ConstLabels: nil, + }, oneCase.LabelsNames) + for _, aArgs := range oneCase.AddArgs { + if rand.Intn(199)%2 == 0 { + counter.WithLabelValues(aArgs...).Add(float64(rand.Intn(199))) + } else { + labels := make(prometheus.Labels, 0) + for k, labelName := range oneCase.LabelsNames { + labels[labelName] = aArgs[k] + } + counter.With(labels) + } + } + for _, dArgs := range oneCase.DeleteArgs { + counter.DeleteAllAboutLabels(dArgs) + } + + cOutput := make(chan prometheus.Metric, len(oneCase.AddArgs)*3) + + counter.Collect(cOutput) + + c.Assert(len(cOutput), Equals, oneCase.WantResLength) + } +} diff --git a/pkg/metricsproxy/gaugevec.go b/pkg/metricsproxy/gaugevec.go new file mode 100644 index 0000000000..35fe7829ca --- /dev/null +++ b/pkg/metricsproxy/gaugevec.go @@ -0,0 +1,80 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// GaugeVecProxy to proxy prometheus.GaugeVec +type GaugeVecProxy struct { + LabelNames []string + Labels map[string]map[string]string + *prometheus.GaugeVec +} + +// NewGaugeVec creates a new GaugeVec based on the provided GaugeOpts and +// partitioned by the given label names. +func NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *GaugeVecProxy { + return &GaugeVecProxy{ + LabelNames: labelNames, + Labels: make(map[string]map[string]string, 0), + GaugeVec: prometheus.NewGaugeVec(opts, labelNames), + } +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Add(42) +func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge { + if len(lvs) > 0 { + labels := make(map[string]string, len(lvs)) + for index, label := range lvs { + labels[c.LabelNames[index]] = label + } + noteLabelsInMetricsProxy(c, labels) + } + return c.GaugeVec.WithLabelValues(lvs...) +} + +// With works as GetMetricWith, but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) +func (c *GaugeVecProxy) With(labels prometheus.Labels) prometheus.Gauge { + if len(labels) > 0 { + noteLabelsInMetricsProxy(c, labels) + } + + return c.GaugeVec.With(labels) +} + +// DeleteAllAboutLabels Remove all labelsValue with these labels +func (c *GaugeVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { + if len(labels) == 0 { + return false + } + + return findAndDeleteLabelsInMetricsProxy(c, labels) +} + +// GetLabels to support get GaugeVecProxy's Labels when you use Proxy object +func (c *GaugeVecProxy) GetLabels() map[string]map[string]string { + return c.Labels +} + +// vecDelete to support delete GaugeVecProxy's Labels when you use Proxy object +func (c *GaugeVecProxy) vecDelete(labels prometheus.Labels) bool { + return c.GaugeVec.Delete(labels) +} diff --git a/pkg/metricsproxy/gaugevec_test.go b/pkg/metricsproxy/gaugevec_test.go new file mode 100644 index 0000000000..cc68a4c6e9 --- /dev/null +++ b/pkg/metricsproxy/gaugevec_test.go @@ -0,0 +1,56 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "math/rand" + "time" + + . "github.com/pingcap/check" + + "github.com/prometheus/client_golang/prometheus" +) + +func (t *testMetricsProxySuite) TestGaugeVecProxy(c *C) { + rand.Seed(time.Now().UnixNano()) + for _, oneCase := range testCases { + gauge := NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "dm", + Subsystem: "metricsProxy", + Name: "Test_Gauge", + Help: "dm gauge metrics proxy test", + ConstLabels: nil, + }, oneCase.LabelsNames) + for _, aArgs := range oneCase.AddArgs { + if rand.Intn(199)%2 == 0 { + gauge.WithLabelValues(aArgs...).Add(float64(rand.Intn(199))) + } else { + labels := make(prometheus.Labels, 0) + for k, labelName := range oneCase.LabelsNames { + labels[labelName] = aArgs[k] + } + gauge.With(labels) + } + } + for _, dArgs := range oneCase.DeleteArgs { + gauge.DeleteAllAboutLabels(dArgs) + } + + cOutput := make(chan prometheus.Metric, len(oneCase.AddArgs)*3) + + gauge.Collect(cOutput) + + c.Assert(len(cOutput), Equals, oneCase.WantResLength) + } +} diff --git a/pkg/metricsproxy/histogramvec.go b/pkg/metricsproxy/histogramvec.go new file mode 100644 index 0000000000..1b5c8f074d --- /dev/null +++ b/pkg/metricsproxy/histogramvec.go @@ -0,0 +1,80 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// HistogramVecProxy to proxy prometheus.HistogramVec +type HistogramVecProxy struct { + LabelNames []string + Labels map[string]map[string]string + *prometheus.HistogramVec +} + +// NewHistogramVec creates a new HistogramVec based on the provided HistogramOpts and +// partitioned by the given label names. +func NewHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *HistogramVecProxy { + return &HistogramVecProxy{ + LabelNames: labelNames, + Labels: make(map[string]map[string]string, 0), + HistogramVec: prometheus.NewHistogramVec(opts, labelNames), + } +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Observe(42.21) +func (c *HistogramVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { + if len(lvs) > 0 { + labels := make(map[string]string, len(lvs)) + for index, label := range lvs { + labels[c.LabelNames[index]] = label + } + noteLabelsInMetricsProxy(c, labels) + } + return c.HistogramVec.WithLabelValues(lvs...) +} + +// With works as GetMetricWith but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21) +func (c *HistogramVecProxy) With(labels prometheus.Labels) prometheus.Observer { + if len(labels) > 0 { + noteLabelsInMetricsProxy(c, labels) + } + + return c.HistogramVec.With(labels) +} + +// DeleteAllAboutLabels Remove all labelsValue with these labels +func (c *HistogramVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { + if len(labels) == 0 { + return false + } + + return findAndDeleteLabelsInMetricsProxy(c, labels) +} + +// GetLabels to support get HistogramVecProxy's Labels when you use Proxy object +func (c *HistogramVecProxy) GetLabels() map[string]map[string]string { + return c.Labels +} + +// vecDelete to support delete HistogramVecProxy's Labels when you use Proxy object +func (c *HistogramVecProxy) vecDelete(labels prometheus.Labels) bool { + return c.HistogramVec.Delete(labels) +} diff --git a/pkg/metricsproxy/histogramvec_test.go b/pkg/metricsproxy/histogramvec_test.go new file mode 100644 index 0000000000..e84f7376de --- /dev/null +++ b/pkg/metricsproxy/histogramvec_test.go @@ -0,0 +1,56 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "math/rand" + "time" + + . "github.com/pingcap/check" + + "github.com/prometheus/client_golang/prometheus" +) + +func (t *testMetricsProxySuite) TestHistogramVecProxy(c *C) { + rand.Seed(time.Now().UnixNano()) + for _, oneCase := range testCases { + histogram := NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "metricsProxy", + Name: "Test_Histogram", + Help: "dm histogram metrics proxy test", + ConstLabels: nil, + }, oneCase.LabelsNames) + for _, aArgs := range oneCase.AddArgs { + if rand.Intn(199)%2 == 0 { + histogram.WithLabelValues(aArgs...).Observe(float64(rand.Intn(199))) + } else { + labels := make(prometheus.Labels, 0) + for k, labelName := range oneCase.LabelsNames { + labels[labelName] = aArgs[k] + } + histogram.With(labels) + } + } + for _, dArgs := range oneCase.DeleteArgs { + histogram.DeleteAllAboutLabels(dArgs) + } + + cOutput := make(chan prometheus.Metric, len(oneCase.AddArgs)*3) + + histogram.Collect(cOutput) + + c.Assert(len(cOutput), Equals, oneCase.WantResLength) + } +} diff --git a/pkg/metricsproxy/proxy.go b/pkg/metricsproxy/proxy.go new file mode 100644 index 0000000000..133eccef7b --- /dev/null +++ b/pkg/metricsproxy/proxy.go @@ -0,0 +1,70 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "crypto/md5" + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// Proxy Interface +type Proxy interface { + GetLabels() map[string]map[string]string + vecDelete(prometheus.Labels) bool +} + +// noteLabelsInMetricsProxy common function in Proxy +func noteLabelsInMetricsProxy(proxy Proxy, labels map[string]string) { + labelsMd5Sum := labelsMd5Sum(labels) + + if _, ok := proxy.GetLabels()[labelsMd5Sum]; !ok { + proxy.GetLabels()[labelsMd5Sum] = labels + } +} + +// labelsMd5Sum common function in Proxy +func labelsMd5Sum(labels map[string]string) string { + var str string + for _, label := range labels { + str += label + } + return fmt.Sprintf("%x", md5.Sum([]byte(str))) +} + +// findAndDeleteLabelsInMetricsProxy common function in Proxy +func findAndDeleteLabelsInMetricsProxy(proxy Proxy, labels prometheus.Labels) bool { + var ( + deleteLabelsList = make([]map[string]string, 0) + res = true + ) + inputLabelsLen := len(labels) + for _, ls := range proxy.GetLabels() { + t := 0 + for k := range labels { + if ls[k] == labels[k] { + t++ + } + } + if t == inputLabelsLen { + deleteLabelsList = append(deleteLabelsList, ls) + } + } + + for _, deleteLabels := range deleteLabelsList { + res = proxy.vecDelete(deleteLabels) && res + } + return res +} diff --git a/pkg/metricsproxy/proxy_test.go b/pkg/metricsproxy/proxy_test.go new file mode 100644 index 0000000000..8381b6bc90 --- /dev/null +++ b/pkg/metricsproxy/proxy_test.go @@ -0,0 +1,140 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testMetricsProxySuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testMetricsProxySuite struct { +} + +type testCase struct { + LabelsNames []string + AddArgs [][]string + DeleteArgs []map[string]string + WantResLength int +} + +var testCases = []testCase{ + { + LabelsNames: []string{ + "task", + "name", + }, + AddArgs: [][]string{ + {"task1", "name1"}, + {"task2", "name2"}, + {"task3", "name3"}, + {"task4", "name4"}, + {"task5", "name5"}, + {"task6", "name6"}, + {"task7", "name7"}, + {"task8", "name7"}, + {"task8", "name9"}, + {"task10", "name10"}, + }, + DeleteArgs: []map[string]string{ + {"task": "task1", "name": "name1"}, + {"task": "task1", "name": "name1"}, + {"task": "task2"}, + {"task": "task8"}, + {"name": "name10"}, + }, + WantResLength: 5, + }, + { + LabelsNames: []string{ + "task", + }, + AddArgs: [][]string{ + {"task1"}, + {"task2"}, + {"task3"}, + {"task4"}, + {"task5"}, + {"task6"}, + {"task7"}, + }, + DeleteArgs: []map[string]string{ + {"task": "task2"}, + {"task": "task8"}, + }, + WantResLength: 6, + }, + { + LabelsNames: []string{ + "type", + "task", + "queueNo", + }, + AddArgs: [][]string{ + {"flash", "task2", "No.2"}, + {"flash", "task3", "No.3"}, + {"flash", "task4", "No.4"}, + {"flash", "task5", "No.5"}, + {"flash", "task6", "No.6"}, + }, + DeleteArgs: []map[string]string{ + {"type": "flash"}, + }, + WantResLength: 0, + }, + { + LabelsNames: []string{ + "type", + "task", + "queueNo", + }, + AddArgs: [][]string{ + {"flash", "task2", "No.2"}, + {"flash", "task2", "No.3"}, + {"flash", "task4", "No.4"}, + {"flash", "task5", "No.4"}, + {"start", "task6", "No.6"}, + }, + DeleteArgs: []map[string]string{ + {"type": "start"}, + {"type": "flash", "task": "task2", "queueNo": "No.3"}, + {"type": "start", "task": "task2", "queueNo": "No.4"}, + }, + WantResLength: 3, + }, + { + LabelsNames: []string{ + "type", + "task", + "queueNo", + }, + AddArgs: [][]string{ + {"flash", "task2", "No.2"}, + {"flash", "task2", "No.3"}, + {"flash", "task4", "No.4"}, + {"flash", "task5", "No.4"}, + {"start", "task6", "No.6"}, + }, + DeleteArgs: []map[string]string{ + {}, + }, + WantResLength: 5, + }, +} diff --git a/pkg/metricsproxy/summaryvec.go b/pkg/metricsproxy/summaryvec.go new file mode 100644 index 0000000000..5a919bbbc2 --- /dev/null +++ b/pkg/metricsproxy/summaryvec.go @@ -0,0 +1,84 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// SummaryVecProxy to proxy prometheus.SummaryVec +type SummaryVecProxy struct { + LabelNames []string + Labels map[string]map[string]string + *prometheus.SummaryVec +} + +// NewSummaryVec creates a new SummaryVec based on the provided SummaryOpts and +// partitioned by the given label names. +// +// Due to the way a Summary is represented in the Prometheus text format and how +// it is handled by the Prometheus server internally, “quantile” is an illegal +// label name. NewSummaryVec will panic if this label name is used. +func NewSummaryVec(opts prometheus.SummaryOpts, labelNames []string) *SummaryVecProxy { + return &SummaryVecProxy{ + LabelNames: labelNames, + Labels: make(map[string]map[string]string, 0), + SummaryVec: prometheus.NewSummaryVec(opts, labelNames), + } +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Observe(42.21) +func (c *SummaryVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { + if len(lvs) > 0 { + labels := make(map[string]string, len(lvs)) + for index, label := range lvs { + labels[c.LabelNames[index]] = label + } + noteLabelsInMetricsProxy(c, labels) + } + return c.SummaryVec.WithLabelValues(lvs...) +} + +// With works as GetMetricWith, but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21) +func (c *SummaryVecProxy) With(labels prometheus.Labels) prometheus.Observer { + if len(labels) > 0 { + noteLabelsInMetricsProxy(c, labels) + } + + return c.SummaryVec.With(labels) +} + +// DeleteAllAboutLabels Remove all labelsValue with these labels +func (c *SummaryVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { + if len(labels) == 0 { + return false + } + + return findAndDeleteLabelsInMetricsProxy(c, labels) +} + +// GetLabels to support get SummaryVecProxy's Labels when you use Proxy object +func (c *SummaryVecProxy) GetLabels() map[string]map[string]string { + return c.Labels +} + +// vecDelete to support delete SummaryVecProxy's Labels when you use Proxy object +func (c *SummaryVecProxy) vecDelete(labels prometheus.Labels) bool { + return c.SummaryVec.Delete(labels) +} diff --git a/pkg/metricsproxy/summaryvec_test.go b/pkg/metricsproxy/summaryvec_test.go new file mode 100644 index 0000000000..7899a0b9dd --- /dev/null +++ b/pkg/metricsproxy/summaryvec_test.go @@ -0,0 +1,56 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsproxy + +import ( + "math/rand" + "time" + + . "github.com/pingcap/check" + + "github.com/prometheus/client_golang/prometheus" +) + +func (t *testMetricsProxySuite) TestSummaryVecProxy(c *C) { + rand.Seed(time.Now().UnixNano()) + for _, oneCase := range testCases { + summary := NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "dm", + Subsystem: "metricsProxy", + Name: "Test_Summary", + Help: "dm summary metrics proxy test", + ConstLabels: nil, + }, oneCase.LabelsNames) + for _, aArgs := range oneCase.AddArgs { + if rand.Intn(199)%2 == 0 { + summary.WithLabelValues(aArgs...).Observe(float64(rand.Intn(199))) + } else { + labels := make(prometheus.Labels, 0) + for k, labelName := range oneCase.LabelsNames { + labels[labelName] = aArgs[k] + } + summary.With(labels) + } + } + for _, dArgs := range oneCase.DeleteArgs { + summary.DeleteAllAboutLabels(dArgs) + } + + cOutput := make(chan prometheus.Metric, len(oneCase.AddArgs)*3) + + summary.Collect(cOutput) + + c.Assert(len(cOutput), Equals, oneCase.WantResLength) + } +} diff --git a/relay/metrics.go b/relay/metrics.go index 72d58ced54..f58be5d27a 100644 --- a/relay/metrics.go +++ b/relay/metrics.go @@ -19,12 +19,13 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/metricsproxy" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) var ( - relayLogPosGauge = prometheus.NewGaugeVec( + relayLogPosGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "relay", @@ -32,7 +33,7 @@ var ( Help: "current binlog pos in current binlog file", }, []string{"node"}) - relayLogFileGauge = prometheus.NewGaugeVec( + relayLogFileGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "relay", @@ -42,7 +43,7 @@ var ( // split sub directory info from relayLogPosGauge / relayLogFileGauge // to make compare relayLogFileGauge for master / relay more easier - relaySubDirIndex = prometheus.NewGaugeVec( + relaySubDirIndex = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "relay", @@ -51,7 +52,7 @@ var ( }, []string{"node", "uuid"}) // should alert if available space < 10G - relayLogSpaceGauge = prometheus.NewGaugeVec( + relayLogSpaceGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "relay", diff --git a/syncer/metrics.go b/syncer/metrics.go index d2e72b1b31..0074b557fa 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -19,6 +19,8 @@ import ( "time" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/metricsproxy" + cpu "github.com/pingcap/tidb-tools/pkg/utils" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -28,7 +30,7 @@ import ( ) var ( - binlogReadDurationHistogram = prometheus.NewHistogramVec( + binlogReadDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -37,7 +39,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - binlogEventSizeHistogram = prometheus.NewHistogramVec( + binlogEventSizeHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -46,7 +48,7 @@ var ( Buckets: prometheus.ExponentialBuckets(16, 2, 20), }, []string{"task"}) - binlogEvent = prometheus.NewHistogramVec( + binlogEvent = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -55,7 +57,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) - conflictDetectDurationHistogram = prometheus.NewHistogramVec( + conflictDetectDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -64,7 +66,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - addJobDurationHistogram = prometheus.NewHistogramVec( + addJobDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -75,7 +77,7 @@ var ( // dispatch/add multiple jobs for one binlog event. // NOTE: only observe for DML now. - dispatchBinlogDurationHistogram = prometheus.NewHistogramVec( + dispatchBinlogDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -84,7 +86,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) - skipBinlogDurationHistogram = prometheus.NewHistogramVec( + skipBinlogDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -93,7 +95,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0000005, 2, 25), // this should be very fast. }, []string{"type", "task"}) - addedJobsTotal = prometheus.NewCounterVec( + addedJobsTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "syncer", @@ -101,7 +103,7 @@ var ( Help: "total number of added jobs", }, []string{"type", "task", "queueNo"}) - finishedJobsTotal = prometheus.NewCounterVec( + finishedJobsTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "syncer", @@ -109,7 +111,7 @@ var ( Help: "total number of finished jobs", }, []string{"type", "task", "queueNo"}) - queueSizeGauge = prometheus.NewGaugeVec( + queueSizeGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -117,7 +119,7 @@ var ( Help: "remain size of the DML queue", }, []string{"task", "queueNo"}) - binlogPosGauge = prometheus.NewGaugeVec( + binlogPosGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -125,7 +127,7 @@ var ( Help: "current binlog pos", }, []string{"node", "task"}) - binlogFileGauge = prometheus.NewGaugeVec( + binlogFileGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -133,7 +135,7 @@ var ( Help: "current binlog file index", }, []string{"node", "task"}) - sqlRetriesTotal = prometheus.NewCounterVec( + sqlRetriesTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "syncer", @@ -141,7 +143,7 @@ var ( Help: "total number of sql retries", }, []string{"type", "task"}) - txnHistogram = prometheus.NewHistogramVec( + txnHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -150,7 +152,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - queryHistogram = prometheus.NewHistogramVec( + queryHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -159,7 +161,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) - stmtHistogram = prometheus.NewHistogramVec( + stmtHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", Subsystem: "syncer", @@ -178,7 +180,7 @@ var ( }) // should alert - syncerExitWithErrorCounter = prometheus.NewCounterVec( + syncerExitWithErrorCounter = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", Subsystem: "syncer", @@ -187,7 +189,7 @@ var ( }, []string{"task"}) // some problems with it - replicationLagGauge = prometheus.NewGaugeVec( + replicationLagGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -195,7 +197,7 @@ var ( Help: "replication lag in second between mysql and syncer", }, []string{"task"}) - remainingTimeGauge = prometheus.NewGaugeVec( + remainingTimeGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -203,7 +205,7 @@ var ( Help: "the remaining time in second to catch up master", }, []string{"task"}) - unsyncedTableGauge = prometheus.NewGaugeVec( + unsyncedTableGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -211,7 +213,7 @@ var ( Help: "number of unsynced tables in the subtask", }, []string{"task", "table"}) - shardLockResolving = prometheus.NewGaugeVec( + shardLockResolving = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", Subsystem: "syncer", @@ -293,3 +295,26 @@ func InitStatusAndMetrics(addr string) { } }() } +func (s *Syncer) removeLabelValuesWithTaskInMetrics(task string) { + binlogReadDurationHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + binlogEventSizeHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + binlogEvent.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + conflictDetectDurationHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + addJobDurationHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + dispatchBinlogDurationHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + skipBinlogDurationHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + addedJobsTotal.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + finishedJobsTotal.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + queueSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + sqlRetriesTotal.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + binlogPosGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + binlogFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + syncerExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + replicationLagGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + remainingTimeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + unsyncedTableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task}) + shardLockResolving.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 162c029e3c..ad0dad86e6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -2245,6 +2245,8 @@ func (s *Syncer) Close() { // when closing syncer by `stop-task`, remove active relay log from hub s.removeActiveRelayLog() + s.removeLabelValuesWithTaskInMetrics(s.cfg.Name) + s.closed.Set(true) } From 69c6d089798d57f9750f720c7065ca3a1b9cf92d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 9 Apr 2020 15:15:38 +0800 Subject: [PATCH 2/3] .*/: fix dm unit tests and integration tests (#578) --- loader/loader.go | 12 ++++++++-- pkg/metricsproxy/countervec.go | 11 ++++++++- pkg/metricsproxy/gaugevec.go | 11 ++++++++- pkg/metricsproxy/histogramvec.go | 11 ++++++++- pkg/metricsproxy/summaryvec.go | 11 ++++++++- pkg/streamer/reader_test.go | 14 +++++++----- tests/_dmctl_tools/check_master_online.go | 4 +++- tests/_dmctl_tools/check_worker_online.go | 4 +++- tests/initial_unit/run.sh | 11 ++++----- tests/relay_interrupt/run.sh | 27 ++++++++++++++++++++--- tests/start_task/run.sh | 9 +++++--- 11 files changed, 100 insertions(+), 25 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index b079b70f34..53db0daaeb 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -358,6 +358,8 @@ type Loader struct { // for every worker goroutine, not for every data file workerWg *sync.WaitGroup + // for other goroutines + wg sync.WaitGroup fileJobQueue chan *fileJob fileJobQueueClosed sync2.AtomicBool @@ -576,7 +578,11 @@ func (l *Loader) Restore(ctx context.Context) error { return err2 } - go l.PrintStatus(ctx) + l.wg.Add(1) + go func() { + defer l.wg.Done() + l.PrintStatus(ctx) + }() begin := time.Now() err = l.restoreData(ctx) @@ -634,8 +640,10 @@ func (l *Loader) stopLoad() { l.closeFileJobQueue() l.workerWg.Wait() - l.logCtx.L().Debug("all workers have been closed") + + l.wg.Wait() + l.logCtx.L().Debug("all loader's go-routines have been closed") } // Pause pauses the process, and it can be resumed later diff --git a/pkg/metricsproxy/countervec.go b/pkg/metricsproxy/countervec.go index 87ea599c72..0af76e47c8 100644 --- a/pkg/metricsproxy/countervec.go +++ b/pkg/metricsproxy/countervec.go @@ -14,11 +14,15 @@ package metricsproxy import ( + "sync" + "github.com/prometheus/client_golang/prometheus" ) // CounterVecProxy to proxy prometheus.CounterVec type CounterVecProxy struct { + mu sync.Mutex + LabelNames []string Labels map[string]map[string]string *prometheus.CounterVec @@ -44,7 +48,9 @@ func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter { for index, label := range lvs { labels[c.LabelNames[index]] = label } + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.CounterVec.WithLabelValues(lvs...) } @@ -54,7 +60,9 @@ func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter { // myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter { if len(labels) > 0 { + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.CounterVec.With(labels) @@ -65,7 +73,8 @@ func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { if len(labels) == 0 { return false } - + c.mu.Lock() + defer c.mu.Unlock() return findAndDeleteLabelsInMetricsProxy(c, labels) } diff --git a/pkg/metricsproxy/gaugevec.go b/pkg/metricsproxy/gaugevec.go index 35fe7829ca..e2d6310694 100644 --- a/pkg/metricsproxy/gaugevec.go +++ b/pkg/metricsproxy/gaugevec.go @@ -14,11 +14,15 @@ package metricsproxy import ( + "sync" + "github.com/prometheus/client_golang/prometheus" ) // GaugeVecProxy to proxy prometheus.GaugeVec type GaugeVecProxy struct { + mu sync.Mutex + LabelNames []string Labels map[string]map[string]string *prometheus.GaugeVec @@ -44,7 +48,9 @@ func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge { for index, label := range lvs { labels[c.LabelNames[index]] = label } + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.GaugeVec.WithLabelValues(lvs...) } @@ -54,7 +60,9 @@ func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge { // myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) func (c *GaugeVecProxy) With(labels prometheus.Labels) prometheus.Gauge { if len(labels) > 0 { + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.GaugeVec.With(labels) @@ -65,7 +73,8 @@ func (c *GaugeVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { if len(labels) == 0 { return false } - + c.mu.Lock() + defer c.mu.Unlock() return findAndDeleteLabelsInMetricsProxy(c, labels) } diff --git a/pkg/metricsproxy/histogramvec.go b/pkg/metricsproxy/histogramvec.go index 1b5c8f074d..f40ea141ad 100644 --- a/pkg/metricsproxy/histogramvec.go +++ b/pkg/metricsproxy/histogramvec.go @@ -14,11 +14,15 @@ package metricsproxy import ( + "sync" + "github.com/prometheus/client_golang/prometheus" ) // HistogramVecProxy to proxy prometheus.HistogramVec type HistogramVecProxy struct { + mu sync.Mutex + LabelNames []string Labels map[string]map[string]string *prometheus.HistogramVec @@ -44,7 +48,9 @@ func (c *HistogramVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { for index, label := range lvs { labels[c.LabelNames[index]] = label } + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.HistogramVec.WithLabelValues(lvs...) } @@ -54,7 +60,9 @@ func (c *HistogramVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { // myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21) func (c *HistogramVecProxy) With(labels prometheus.Labels) prometheus.Observer { if len(labels) > 0 { + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.HistogramVec.With(labels) @@ -65,7 +73,8 @@ func (c *HistogramVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool if len(labels) == 0 { return false } - + c.mu.Lock() + defer c.mu.Unlock() return findAndDeleteLabelsInMetricsProxy(c, labels) } diff --git a/pkg/metricsproxy/summaryvec.go b/pkg/metricsproxy/summaryvec.go index 5a919bbbc2..38c677fa64 100644 --- a/pkg/metricsproxy/summaryvec.go +++ b/pkg/metricsproxy/summaryvec.go @@ -14,11 +14,15 @@ package metricsproxy import ( + "sync" + "github.com/prometheus/client_golang/prometheus" ) // SummaryVecProxy to proxy prometheus.SummaryVec type SummaryVecProxy struct { + mu sync.Mutex + LabelNames []string Labels map[string]map[string]string *prometheus.SummaryVec @@ -48,7 +52,9 @@ func (c *SummaryVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { for index, label := range lvs { labels[c.LabelNames[index]] = label } + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.SummaryVec.WithLabelValues(lvs...) } @@ -58,7 +64,9 @@ func (c *SummaryVecProxy) WithLabelValues(lvs ...string) prometheus.Observer { // myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21) func (c *SummaryVecProxy) With(labels prometheus.Labels) prometheus.Observer { if len(labels) > 0 { + c.mu.Lock() noteLabelsInMetricsProxy(c, labels) + c.mu.Unlock() } return c.SummaryVec.With(labels) @@ -69,7 +77,8 @@ func (c *SummaryVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool { if len(labels) == 0 { return false } - + c.mu.Lock() + defer c.mu.Unlock() return findAndDeleteLabelsInMetricsProxy(c, labels) } diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index a08c10cc20..631abe0c10 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -36,6 +36,8 @@ import ( "github.com/pingcap/dm/pkg/terror" ) +var parseFileTimeout = 3 * time.Second + var _ = Suite(&testReaderSuite{}) type testReaderSuite struct { @@ -269,7 +271,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { _, err2 := f.Write(extraEvents[0].RawData) c.Assert(err2, IsNil) }() - ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -290,7 +292,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { err2 := ioutil.WriteFile(nextPath, replication.BinLogFileHeader, 0600) c.Assert(err2, IsNil) }() - ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel3() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -340,7 +342,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { // invalid UUID in UUID list, error r.uuids = []string{currentUUID, "invalid.uuid"} - ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -360,7 +362,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { c.Assert(err, IsNil) // has relay log file in next sub directory, need to switch - ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -401,7 +403,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { defer f.Close() // file has no data, meet io.EOF error (when reading file header) and ignore it. but will get `context deadline exceeded` error - ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -422,7 +424,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { c.Assert(err, IsNil) // meet `err EOF` error (when parsing binlog event) ignored - ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) diff --git a/tests/_dmctl_tools/check_master_online.go b/tests/_dmctl_tools/check_master_online.go index 71e8878c9b..746b529540 100644 --- a/tests/_dmctl_tools/check_master_online.go +++ b/tests/_dmctl_tools/check_master_online.go @@ -33,7 +33,9 @@ func main() { } cli := pb.NewMasterClient(conn) req := &pb.ShowDDLLocksRequest{} - _, err = cli.ShowDDLLocks(context.Background(), req) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + _, err = cli.ShowDDLLocks(ctx, req) + cancel() if err != nil { utils.ExitWithError(err) } diff --git a/tests/_dmctl_tools/check_worker_online.go b/tests/_dmctl_tools/check_worker_online.go index a68070645c..3d3058209f 100644 --- a/tests/_dmctl_tools/check_worker_online.go +++ b/tests/_dmctl_tools/check_worker_online.go @@ -33,7 +33,9 @@ func main() { } cli := pb.NewWorkerClient(conn) req := &pb.QueryStatusRequest{} - _, err = cli.QueryStatus(context.Background(), req) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + _, err = cli.QueryStatus(ctx, req) + cancel() if err != nil { utils.ExitWithError(err) } diff --git a/tests/initial_unit/run.sh b/tests/initial_unit/run.sh index 9cafaa1ad4..5b0d051303 100644 --- a/tests/initial_unit/run.sh +++ b/tests/initial_unit/run.sh @@ -10,9 +10,9 @@ WORK_DIR=$TEST_DIR/$TEST_NAME function prepare_data() { run_sql 'DROP DATABASE if exists initial_unit;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql 'CREATE DATABASE initial_unit;' $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "CREATE TABLE initial_unit.t(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "CREATE TABLE initial_unit.t$1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 for j in $(seq 100); do - run_sql "INSERT INTO initial_unit.t VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "INSERT INTO initial_unit.t$1 VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 done } @@ -26,7 +26,10 @@ function run() { for(( i=0;i<${#failpoints[@]};i++)) do WORK_DIR=$TEST_DIR/$TEST_NAME/$i - prepare_data + # clear downstream env + run_sql 'DROP DATABASE if exists dm_meta;' $TIDB_PORT $TIDB_PASSWORD + run_sql 'DROP DATABASE if exists initial_unit;' $TIDB_PORT $TIDB_PASSWORD + prepare_data $i echo "failpoint=${failpoints[i]}" export GO_FAILPOINTS=${failpoints[i]} @@ -82,8 +85,6 @@ function run() { "current stage is not paused not valid" 1 cleanup_process - run_sql "drop database if exists initial_unit" $TIDB_PORT $TIDB_PASSWORD - run_sql "drop database if exists dm_meta" $TIDB_PORT $TIDB_PASSWORD done } diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index 5e779680d0..60adf3b177 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -10,12 +10,16 @@ WORK_DIR=$TEST_DIR/$TEST_NAME function prepare_data() { run_sql 'DROP DATABASE if exists relay_interrupt;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql 'CREATE DATABASE relay_interrupt;' $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "CREATE TABLE relay_interrupt.t(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "CREATE TABLE relay_interrupt.t$1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 for j in $(seq 100); do - run_sql "INSERT INTO relay_interrupt.t VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "INSERT INTO relay_interrupt.t$1 VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 done } +function prepare_data2() { + run_sql "DELETE FROM relay_interrupt.t$1 limit 1;" $MYSQL_PORT1 $MYSQL_PASSWORD1 +} + function run() { failpoints=( # 1152 is ErrAbortingConnection @@ -29,7 +33,10 @@ function run() { echo "failpoint=${failpoints[i]}" export GO_FAILPOINTS=${failpoints[i]} - prepare_data + # clear downstream env + run_sql 'DROP DATABASE if exists dm_meta;' $TIDB_PORT $TIDB_PASSWORD + run_sql 'DROP DATABASE if exists relay_interrupt;' $TIDB_PORT $TIDB_PASSWORD + prepare_data1 $i run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT @@ -73,6 +80,20 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +# prepare_data2 $i +# echo "read binlog from relay log failed, and will use remote binlog" + kill_dm_worker + export GO_FAILPOINTS="github.com/pingcap/dm/pkg/streamer/GetEventFromLocalFailed=return()" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + sleep 8 +# run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ +# "query-status test" \ +# "\"binlogType\": \"remote\"" 1 +# +# check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + export GO_FAILPOINTS='' cleanup_process done } diff --git a/tests/start_task/run.sh b/tests/start_task/run.sh index 0e6cacb5ac..db4d539f8a 100644 --- a/tests/start_task/run.sh +++ b/tests/start_task/run.sh @@ -10,9 +10,9 @@ WORK_DIR=$TEST_DIR/$TEST_NAME function prepare_data() { run_sql 'DROP DATABASE if exists start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql 'CREATE DATABASE start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "CREATE TABLE start_task.t(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "CREATE TABLE start_task.t$1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 for j in $(seq 100); do - run_sql "INSERT INTO start_task.t VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "INSERT INTO start_task.t$1 VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 done } @@ -29,7 +29,10 @@ function run() { echo "failpoint=${failpoints[i]}" export GO_FAILPOINTS=${failpoints[i]} - prepare_data + # clear downstream env + run_sql 'DROP DATABASE if exists dm_meta;' $TIDB_PORT $TIDB_PASSWORD + run_sql 'DROP DATABASE if exists start_task;' $TIDB_PORT $TIDB_PASSWORD + prepare_data $i run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT From 579ad387a1af3e3be61b0767faa3750acf83121c Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 17 Apr 2020 15:58:41 +0800 Subject: [PATCH 3/3] tests: fix merge --- tests/relay_interrupt/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index 60adf3b177..530dee1bd6 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -7,7 +7,7 @@ source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME -function prepare_data() { +function prepare_data1() { run_sql 'DROP DATABASE if exists relay_interrupt;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql 'CREATE DATABASE relay_interrupt;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "CREATE TABLE relay_interrupt.t$1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1