Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

cherry-pick #575 and #578 to release-1.0 and resolve conflicts #616

Merged
merged 3 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ import (
"net/http"
"net/http/pprof"

"github.com/pingcap/dm/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/mydumper"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/syncer"
)

var (
taskState = prometheus.NewGaugeVec(
taskState = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "worker",
Expand Down Expand Up @@ -87,3 +88,7 @@ func InitStatus(lis net.Listener) {
log.L().Error("fail to start status server return", log.ShortError(err))
}
}

func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
1 change: 1 addition & 0 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (st *SubTask) Close() {
st.cancel()
st.closeUnits() // close all un-closed units
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name)
st.wg.Wait()
}

Expand Down
13 changes: 11 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ type Loader struct {

// for every worker goroutine, not for every data file
workerWg *sync.WaitGroup
// for other goroutines
wg sync.WaitGroup

fileJobQueue chan *fileJob
fileJobQueueClosed sync2.AtomicBool
Expand Down Expand Up @@ -576,7 +578,11 @@ func (l *Loader) Restore(ctx context.Context) error {
return err2
}

go l.PrintStatus(ctx)
l.wg.Add(1)
go func() {
defer l.wg.Done()
l.PrintStatus(ctx)
}()

begin := time.Now()
err = l.restoreData(ctx)
Expand Down Expand Up @@ -621,6 +627,7 @@ func (l *Loader) Close() {
l.logCtx.L().Error("close downstream DB error", log.ShortError(err))
}
l.checkPoint.Close()
l.removeLabelValuesWithTaskInMetrics(l.cfg.Name)
l.closed.Set(true)
}

Expand All @@ -633,8 +640,10 @@ func (l *Loader) stopLoad() {

l.closeFileJobQueue()
l.workerWg.Wait()

l.logCtx.L().Debug("all workers have been closed")

l.wg.Wait()
l.logCtx.L().Debug("all loader's go-routines have been closed")
}

// Pause pauses the process, and it can be resumed later
Expand Down
32 changes: 23 additions & 9 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ package loader

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/dm/pkg/metricsproxy"
)

var (
// should error
tidbExecutionErrorCounter = prometheus.NewCounterVec(
tidbExecutionErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "tidb_execution_error",
Help: "Total count of tidb execution errors",
}, []string{"task"})

queryHistogram = prometheus.NewHistogramVec(
queryHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -36,7 +38,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task"})

txnHistogram = prometheus.NewHistogramVec(
txnHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -45,7 +47,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task"})

stmtHistogram = prometheus.NewHistogramVec(
stmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -54,31 +56,31 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"type", "task"})

dataFileGauge = prometheus.NewGaugeVec(
dataFileGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_file_gauge",
Help: "data files in total",
}, []string{"task"})

tableGauge = prometheus.NewGaugeVec(
tableGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "table_gauge",
Help: "tables in total",
}, []string{"task"})

dataSizeGauge = prometheus.NewGaugeVec(
dataSizeGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_size_gauge",
Help: "data size in total",
}, []string{"task"})

progressGauge = prometheus.NewGaugeVec(
progressGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -87,7 +89,7 @@ var (
}, []string{"task"})

// should alert
loaderExitWithErrorCounter = prometheus.NewCounterVec(
loaderExitWithErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -108,3 +110,15 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(progressGauge)
registry.MustRegister(loaderExitWithErrorCounter)
}

func (m *Loader) removeLabelValuesWithTaskInMetrics(task string) {
tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
progressGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
8 changes: 7 additions & 1 deletion mydumper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package mydumper

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/dm/pkg/metricsproxy"
)

var (
// should alert
mydumperExitWithErrorCounter = prometheus.NewCounterVec(
mydumperExitWithErrorCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "mydumper",
Expand All @@ -32,3 +34,7 @@ var (
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(mydumperExitWithErrorCounter)
}

func (m *Mydumper) removeLabelValuesWithTaskInMetrics(task string) {
mydumperExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
}
2 changes: 2 additions & 0 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func (m *Mydumper) Close() {
if m.closed.Get() {
return
}

m.removeLabelValuesWithTaskInMetrics(m.cfg.Name)
// do nothing, external will cancel the command (if running)
m.closed.Set(true)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -116,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand Down Expand Up @@ -206,7 +206,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *pr
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/conn/baseconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -36,7 +37,7 @@ type testBaseConnSuite struct {
}

var (
testStmtHistogram = prometheus.NewHistogramVec(
testStmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Expand Down
89 changes: 89 additions & 0 deletions pkg/metricsproxy/countervec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package metricsproxy

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// CounterVecProxy to proxy prometheus.CounterVec
type CounterVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
*prometheus.CounterVec
}

// NewCounterVec creates a new CounterVec based on the provided CounterOpts and
// partitioned by the given label names.
func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *CounterVecProxy {
return &CounterVecProxy{
LabelNames: labelNames,
Labels: make(map[string]map[string]string, 0),
CounterVec: prometheus.NewCounterVec(opts, labelNames),
}
}

// WithLabelValues works as GetMetricWithLabelValues, but panics where
// GetMetricWithLabelValues would have returned an error. Not returning an
// error allows shortcuts like
// myVec.WithLabelValues("404", "GET").Add(42)
func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
if len(lvs) > 0 {
labels := make(map[string]string, len(lvs))
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}
return c.CounterVec.WithLabelValues(lvs...)
}

// With works as GetMetricWith, but panics where GetMetricWithLabels would have
// returned an error. Not returning an error allows shortcuts like
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}

return c.CounterVec.With(labels)
}

// DeleteAllAboutLabels Remove all labelsValue with these labels
func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

// GetLabels to support get CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) GetLabels() map[string]map[string]string {
return c.Labels
}

// vecDelete to support delete CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) vecDelete(labels prometheus.Labels) bool {
return c.CounterVec.Delete(labels)
}
Loading