Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: change panels for loader, dumpling #813

Merged
merged 5 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions dm/dm-ansible/scripts/dm.json
Original file line number Diff line number Diff line change
Expand Up @@ -2681,7 +2681,7 @@
"tableColumn": "",
"targets": [
{
"expr": "dm_loader_progress{task=\"$task\",instance=\"$instance\"}",
"expr": "dm_loader_progress{task=\"$task\",source_id=\"$source\"}",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -2762,7 +2762,7 @@
"tableColumn": "",
"targets": [
{
"expr": "dm_loader_data_size_gauge{task=\"$task\",instance=\"$instance\"}",
"expr": "dm_loader_data_size_gauge{task=\"$task\",source_id=\"$source\"}",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -2843,7 +2843,7 @@
"tableColumn": "",
"targets": [
{
"expr": "changes(dm_mydumper_exit_with_error_count{task=\"$task\",instance=\"$instance\"}[30m])",
"expr": "changes(dm_dumpling_exit_with_error_count{task=\"$task\",source_id=\"$source\"}[30m])",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -2924,7 +2924,7 @@
"tableColumn": "",
"targets": [
{
"expr": "changes(dm_loader_exit_with_error_count{task=\"$task\",instance=\"$instance\"}[30m])",
"expr": "changes(dm_loader_exit_with_error_count{task=\"$task\",source_id=\"$source\"}[30m])",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -3005,7 +3005,7 @@
"tableColumn": "",
"targets": [
{
"expr": "dm_loader_table_gauge{task=\"$task\",instance=\"$instance\"}",
"expr": "dm_loader_table_gauge{task=\"$task\",source_id=\"$source\"}",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -3086,7 +3086,7 @@
"tableColumn": "",
"targets": [
{
"expr": "dm_loader_data_file_gauge{task=\"$task\",instance=\"$instance\"}",
"expr": "dm_loader_data_file_gauge{task=\"$task\",source_id=\"$source\"}",
"format": "time_series",
"intervalFactor": 2,
"refId": "A"
Expand Down Expand Up @@ -3145,21 +3145,21 @@
"steppedLine": false,
"targets": [
{
"expr": "histogram_quantile(0.90, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "90",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))",
"expr": "histogram_quantile(0.95, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "95",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "99",
Expand Down
4 changes: 2 additions & 2 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *Dumpling) Init(ctx context.Context) error {

// Process implements Unit.Process
func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name).Add(0)
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Add(0)

failpoint.Inject("dumpUnitProcessWithError", func(val failpoint.Value) {
m.logger.Info("dump unit runs with injected error", zap.String("failpoint", "dumpUnitProcessWithError"), zap.Reflect("error", val))
Expand Down Expand Up @@ -102,7 +102,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
err = export.Dump(m.dumpConfig)

if err != nil {
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(err))
}

Expand Down
2 changes: 1 addition & 1 deletion dumpling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
Subsystem: "dumpling",
Name: "exit_with_error_count",
Help: "counter for dumpling exit with error",
}, []string{"task"})
}, []string{"task", "source_id"})
)

// RegisterMetrics registers metrics.
Expand Down
6 changes: 3 additions & 3 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
ret, err := conn.baseConn.QuerySQL(ctx, query, args...)
if err == nil {
cost := time.Since(startTime)
queryHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
queryHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("query statement",
zap.String("query", utils.TruncateString(query, -1)),
Expand Down Expand Up @@ -119,7 +119,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Inc()
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
})
if err == nil {
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
txnHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
Expand Down
12 changes: 6 additions & 6 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,14 @@ func (l *Loader) Init(ctx context.Context) (err error) {

// Process implements Unit.Process
func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Add(0)
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Add(0)

newCtx, cancel := context.WithCancel(ctx)
defer cancel()

l.newFileJobQueue()
if err := l.getMydumpMetadata(); err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc()
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Inc()
pr <- pb.ProcessResult{
Errors: []*pb.ProcessError{unit.NewProcessError(err)},
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
wg.Wait() // wait for receive all fatal from l.runFatalChan

if err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc()
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(err))
}

Expand Down Expand Up @@ -889,7 +889,7 @@ func (l *Loader) prepareTableFiles(files map[string]struct{}) error {
l.totalFileCount.Add(1) // for table
}

tableGauge.WithLabelValues(l.cfg.Name).Set(tablesNumber)
tableGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(tablesNumber)
return nil
}

Expand Down Expand Up @@ -944,8 +944,8 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error {
tables[table] = dataFiles
}

dataFileGauge.WithLabelValues(l.cfg.Name).Set(dataFilesNumber)
dataSizeGauge.WithLabelValues(l.cfg.Name).Set(float64(l.totalDataSize.Get()))
dataFileGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(dataFilesNumber)
dataSizeGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(float64(l.totalDataSize.Get()))
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
Subsystem: "loader",
Name: "tidb_execution_error",
Help: "Total count of tidb execution errors",
}, []string{"task"})
}, []string{"task", "source_id"})

queryHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -36,7 +36,7 @@ var (
Name: "query_duration_time",
Help: "Bucketed histogram of query time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task"})
}, []string{"task", "source_id"})

txnHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -45,7 +45,7 @@ var (
Name: "txn_duration_time",
Help: "Bucketed histogram of processing time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task"})
}, []string{"task", "source_id"})

stmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -62,31 +62,31 @@ var (
Subsystem: "loader",
Name: "data_file_gauge",
Help: "data files in total",
}, []string{"task"})
}, []string{"task", "source_id"})

tableGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "table_gauge",
Help: "tables in total",
}, []string{"task"})
}, []string{"task", "source_id"})

dataSizeGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_size_gauge",
Help: "data size in total",
}, []string{"task"})
}, []string{"task", "source_id"})

progressGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "progress",
Help: "the processing progress of loader in percentage",
}, []string{"task"})
}, []string{"task", "source_id"})

// should alert
loaderExitWithErrorCounter = metricsproxy.NewCounterVec(
Expand All @@ -95,7 +95,7 @@ var (
Subsystem: "loader",
Name: "exit_with_error_count",
Help: "counter for loader exits with error",
}, []string{"task"})
}, []string{"task", "source_id"})
)

// RegisterMetrics registers metrics
Expand Down
2 changes: 1 addition & 1 deletion loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (l *Loader) PrintStatus(ctx context.Context) {
zap.Int64("total_bytes", totalSize),
zap.Int64("total_file_count", totalFileCount),
zap.String("progress", percent(finishedSize, totalSize)))
progressGauge.WithLabelValues(l.cfg.Name).Set(float64(finishedSize) / float64(totalSize))
progressGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(float64(finishedSize) / float64(totalSize))
if done {
return
}
Expand Down
2 changes: 1 addition & 1 deletion mydumper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
Subsystem: "mydumper",
Name: "exit_with_error_count",
Help: "counter for mydumper exit with error",
}, []string{"task"})
}, []string{"task", "source_id"})
)

// RegisterMetrics registers metrics.
Expand Down
4 changes: 2 additions & 2 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *Mydumper) Init(ctx context.Context) error {

// Process implements Unit.Process
func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Add(0)
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Add(0)

failpoint.Inject("dumpUnitProcessWithError", func(val failpoint.Value) {
m.logger.Info("dump unit runs with injected error", zap.String("failpoint", "dumpUnitProcessWithError"), zap.Reflect("error", val))
Expand Down Expand Up @@ -102,7 +102,7 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
output, err := m.spawn(ctx)

if err != nil {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(fmt.Errorf("%s. %s", err.Error(), output)))
} else {
select {
Expand Down