From f69b2ca84b27be1b651e47fe8e40a2e1ad1c56cf Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Fri, 10 Apr 2020 09:06:05 +0800 Subject: [PATCH] syncer: add more metrics; refine batch handle (#590) --- dm/dm-ansible/scripts/dm.json | 422 +++++++++++++++++++++++++++++++++- loader/db.go | 7 +- loader/metrics.go | 14 +- pkg/conn/baseconn.go | 20 +- pkg/conn/baseconn_test.go | 22 +- pkg/conn/basedb_test.go | 2 +- relay/metrics.go | 6 +- syncer/db.go | 21 +- syncer/error_test.go | 2 +- syncer/metrics.go | 61 ++++- syncer/syncer.go | 17 +- syncer/syncer_test.go | 4 +- 12 files changed, 562 insertions(+), 36 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index c41cdb249a..09fde73be1 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -2812,6 +2812,83 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The duration that the binlog replication unit reads binlog from the relay log or upstream MySQL (in seconds)", + "fill": 1, + "id": 53, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "read binlog duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, { "aliasColors": {}, "bars": false, @@ -2889,6 +2966,313 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication unit to detect conflicts between DMLs (in seconds)", + "fill": 1, + "id": 54, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "cost of conflict detect", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication to execute the transaction to the downstream (in seconds)", + "fill": 1, + "id": 1, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "transaction execution latency", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The size of a single binlog event that the binlog replication reads from relay log or upstream master", + "fill": 1, + "id": 55, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "binlog event size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "decbytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The remain length of DML job queues", + "fill": 1, + "id": 56, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(dm_syncer_query_duration_time_count{task=\"$task\", instance=\"$instance\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "DML queue remain length", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, { "aliasColors": {}, "bars": false, @@ -3047,9 +3431,9 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The time it takes Syncer to execute the transaction to the downstream (in seconds)", + "description": "The time it takes binlog replication to execute every statement to the downstream (in seconds)", "fill": 1, - "id": 1, + "id": 57, "legend": { "avg": false, "current": false, @@ -3069,21 +3453,43 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 3, + "span": 4, "stack": false, "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_stmt_duration_time_bucket{task=\"$task\", type=\"begin\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "begin", "refId": "A" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_stmt_duration_time_bucket{task=\"$task\", type=\"stmt\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "stmt", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_stmt_duration_time_bucket{task=\"$task\", type=\"commit\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "commit", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_stmt_duration_time_bucket{task=\"$task\", type=\"rollback\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "rollback", + "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "execution latency", + "title": "statement execution latency", "tooltip": { "shared": true, "sort": 0, @@ -3146,7 +3552,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 6, + "span": 4, "stack": false, "steppedLine": false, "targets": [ @@ -3224,7 +3630,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 6, + "span": 4, "stack": false, "steppedLine": false, "targets": [ @@ -3361,5 +3767,5 @@ }, "timezone": "", "title": "DM-task", - "version": 28 + "version": 29 } \ No newline at end of file diff --git a/loader/db.go b/loader/db.go index 3b61fac69c..0e654124c2 100644 --- a/loader/db.go +++ b/loader/db.go @@ -147,7 +147,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... params, func(ctx *tcontext.Context) (interface{}, error) { startTime := time.Now() - _, err := conn.baseConn.ExecuteSQL(ctx, queries, args...) + _, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...) failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) { errCode, err1 := strconv.ParseUint(val.(string), 10, 16) if err1 != nil { @@ -163,7 +163,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... cost := time.Since(startTime) txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds()) if cost.Seconds() > 1 { - ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost)) + ctx.L().Warn("execute transaction", + zap.String("query", utils.TruncateInterface(queries, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + zap.Duration("cost time", cost)) } } return nil, err diff --git a/loader/metrics.go b/loader/metrics.go index 5bf6f5ece7..e95071756c 100644 --- a/loader/metrics.go +++ b/loader/metrics.go @@ -33,7 +33,7 @@ var ( Subsystem: "loader", Name: "query_duration_time", Help: "Bucketed histogram of query time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16), + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), }, []string{"task"}) txnHistogram = prometheus.NewHistogramVec( @@ -42,9 +42,18 @@ var ( Subsystem: "loader", Name: "txn_duration_time", Help: "Bucketed histogram of processing time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16), + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), }, []string{"task"}) + stmtHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "loader", + Name: "stmt_duration_time", + Help: "Bucketed histogram of every statement query time (s).", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + }, []string{"type", "task"}) + dataFileGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", @@ -92,6 +101,7 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(tidbExecutionErrorCounter) registry.MustRegister(txnHistogram) registry.MustRegister(queryHistogram) + registry.MustRegister(stmtHistogram) registry.MustRegister(dataFileGauge) registry.MustRegister(tableGauge) registry.MustRegister(dataSizeGauge) diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index ee4b0d8276..901933d108 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -18,9 +18,11 @@ import ( "database/sql/driver" "fmt" "strings" + "time" "github.com/go-sql-driver/mysql" "github.com/pingcap/failpoint" + "github.com/prometheus/client_golang/prometheus" gmysql "github.com/siddontang/go-mysql/mysql" "go.uber.org/zap" @@ -114,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int // return // 1. failed: (the index of sqls executed error, error) // 2. succeed: (len(sqls), nil) -func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) { +func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) { // inject an error to trigger retry, this should be placed before the real execution of the SQL statement. failpoint.Inject("retryableError", func(val failpoint.Value) { if mark, ok := val.(string); ok { @@ -140,11 +142,13 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr return 0, terror.ErrDBUnExpect.Generate("database connection not valid") } + startTime := time.Now() txn, err := conn.DBConn.BeginTx(tctx.Context(), nil) if err != nil { return 0, terror.ErrDBExecuteFailed.Delegate(err, "begin") } + hVec.WithLabelValues("begin", task).Observe(time.Since(startTime).Seconds()) l := len(queries) @@ -158,8 +162,11 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1))) + startTime = time.Now() _, err = txn.ExecContext(tctx.Context(), query, arg...) - if err != nil { + if err == nil { + hVec.WithLabelValues("stmt", task).Observe(time.Since(startTime).Seconds()) + } else { if ignoreErr != nil && ignoreErr(err) { tctx.L().Warn("execute statement failed and will ignore this error", zap.String("query", utils.TruncateString(query, -1)), @@ -172,21 +179,26 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err)) + startTime = time.Now() rerr := txn.Rollback() if rerr != nil { tctx.L().Error("rollback failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(rerr)) + } else { + hVec.WithLabelValues("rollback", task).Observe(time.Since(startTime).Seconds()) } // we should return the exec err, instead of the rollback rerr. return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1)) } } + startTime = time.Now() err = txn.Commit() if err != nil { return l - 1, terror.ErrDBExecuteFailed.Delegate(err, "commit") // mark failed on the last one } + hVec.WithLabelValues("commit", task).Observe(time.Since(startTime).Seconds()) return l, nil } @@ -194,8 +206,8 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr // return // 1. failed: (the index of sqls executed error, error) // 2. succeed: (len(sqls), nil) -func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) { - return conn.ExecuteSQLWithIgnoreError(tctx, nil, queries, args...) +func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) { + return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...) } // ApplyRetryStrategy apply specify strategy for BaseConn diff --git a/pkg/conn/baseconn_test.go b/pkg/conn/baseconn_test.go index dce1cbc948..1f2a6407ff 100644 --- a/pkg/conn/baseconn_test.go +++ b/pkg/conn/baseconn_test.go @@ -23,6 +23,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" + "github.com/prometheus/client_golang/prometheus" ) func TestSuite(t *testing.T) { @@ -34,6 +35,17 @@ var _ = Suite(&testBaseConnSuite{}) type testBaseConnSuite struct { } +var ( + testStmtHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "conn", + Name: "stmt_duration_time", + Help: "Bucketed histogram of every statement query time (s).", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + }, []string{"type", "task"}) +) + func (t *testBaseConnSuite) TestBaseConn(c *C) { baseConn := NewBaseConn(nil, nil) @@ -44,7 +56,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) { _, err = baseConn.QuerySQL(tctx, "select 1") c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue) - _, err = baseConn.ExecuteSQL(tctx, []string{""}) + _, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""}) c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue) db, mock, err := sqlmock.New() @@ -74,24 +86,24 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) { _, err = baseConn.QuerySQL(tctx, "select 1") c.Assert(terror.ErrDBQueryFailed.Equal(err), IsTrue) - affected, _ := baseConn.ExecuteSQL(tctx, []string{""}) + affected, _ := baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""}) c.Assert(affected, Equals, 0) mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - affected, err = baseConn.ExecuteSQL(tctx, []string{"create database test"}) + affected, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"}) c.Assert(err, IsNil) c.Assert(affected, Equals, 1) mock.ExpectBegin().WillReturnError(errors.New("begin error")) - _, err = baseConn.ExecuteSQL(tctx, []string{"create database test"}) + _, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"}) c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue) mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnError(errors.New("invalid connection")) mock.ExpectRollback() - _, err = baseConn.ExecuteSQL(tctx, []string{"create database test"}) + _, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"}) c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue) if err = mock.ExpectationsWereMet(); err != nil { diff --git a/pkg/conn/basedb_test.go b/pkg/conn/basedb_test.go index 1674609efa..16dfd5ac89 100644 --- a/pkg/conn/basedb_test.go +++ b/pkg/conn/basedb_test.go @@ -53,7 +53,7 @@ func (t *testBaseDBSuite) TestGetBaseConn(c *C) { mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - affected, err := dbConn.ExecuteSQL(tctx, []string{"create database test"}) + affected, err := dbConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"}) c.Assert(err, IsNil) c.Assert(affected, Equals, 1) } diff --git a/relay/metrics.go b/relay/metrics.go index 128bd450c9..34c173ed22 100644 --- a/relay/metrics.go +++ b/relay/metrics.go @@ -83,7 +83,7 @@ var ( Subsystem: "relay", Name: "write_duration", Help: "bucketed histogram of write time (s) of single relay log event", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), }) // should alert @@ -110,7 +110,7 @@ var ( Subsystem: "relay", Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) of single binlog event from the master.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), }) binlogTransformDurationHistogram = prometheus.NewHistogram( @@ -119,7 +119,7 @@ var ( Subsystem: "relay", Name: "read_transform_duration", Help: "bucketed histogram of transform time (s) of single binlog event.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), }) // should alert diff --git a/syncer/db.go b/syncer/db.go index 64963c606f..672c2b78f7 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -200,7 +200,19 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter tctx, params, func(ctx *tcontext.Context) (interface{}, error) { - return conn.baseConn.QuerySQL(ctx, query, args...) + startTime := time.Now() + ret, err := conn.baseConn.QuerySQL(ctx, query, args...) + if err == nil { + cost := time.Since(startTime) + queryHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds()) + if cost.Seconds() > 1 { + ctx.L().Warn("query statement", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + zap.Duration("cost time", cost)) + } + } + return ret, err }, ) @@ -265,12 +277,15 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun params, func(ctx *tcontext.Context) (interface{}, error) { startTime := time.Now() - ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, ignoreError, queries, args...) + ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, stmtHistogram, conn.cfg.Name, ignoreError, queries, args...) if err == nil { cost := time.Since(startTime) txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds()) if cost.Seconds() > 1 { - ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost)) + ctx.L().Warn("execute transaction", + zap.String("query", utils.TruncateInterface(queries, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + zap.Duration("cost time", cost)) } } return ret, err diff --git a/syncer/error_test.go b/syncer/error_test.go index 6a4660a064..f11437dc6d 100644 --- a/syncer/error_test.go +++ b/syncer/error_test.go @@ -72,7 +72,7 @@ func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) { var ( syncer = NewSyncer(s.cfg) tctx = tcontext.Background() - conn2 = &DBConn{resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) { + conn2 = &DBConn{cfg: s.cfg, resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) { return nil, nil }} customErr = errors.New("custom error") diff --git a/syncer/metrics.go b/syncer/metrics.go index 048d95ce45..4d9c1bb675 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -28,6 +28,24 @@ import ( ) var ( + binlogReadDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "read_binlog_duration", + Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + }, []string{"task"}) + + binlogEventSizeHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "binlog_event_size", + Help: "size of a binlog event", + Buckets: prometheus.ExponentialBuckets(16, 2, 20), + }, []string{"task"}) + binlogEvent = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dm", @@ -37,6 +55,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), }, []string{"type", "task"}) + conflictDetectDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "conflict_detect_duration", + Help: "bucketed histogram of conflict detect time (s) for single DML statement", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + }, []string{"task"}) + binlogSkippedEventsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", @@ -61,6 +88,14 @@ var ( Help: "total number of finished jobs", }, []string{"type", "task", "queueNo"}) + queueSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "queue_size", + Help: "remain size of the DML queue", + }, []string{"task", "queueNo"}) + binlogPosGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dm", @@ -82,7 +117,7 @@ var ( Namespace: "dm", Subsystem: "syncer", Name: "sql_retries_total", - Help: "total number of sql retryies", + Help: "total number of sql retries", }, []string{"type", "task"}) txnHistogram = prometheus.NewHistogramVec( @@ -94,6 +129,24 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), }, []string{"task"}) + queryHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "query_duration_time", + Help: "Bucketed histogram of query time (s).", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + }, []string{"task"}) + + stmtHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "stmt_duration_time", + Help: "Bucketed histogram of every statement query time (s).", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + }, []string{"type", "task"}) + // FIXME: should I move it to dm-worker? cpuUsageGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -148,14 +201,20 @@ var ( // RegisterMetrics registers metrics func RegisterMetrics(registry *prometheus.Registry) { + registry.MustRegister(binlogReadDurationHistogram) + registry.MustRegister(binlogEventSizeHistogram) registry.MustRegister(binlogEvent) + registry.MustRegister(conflictDetectDurationHistogram) registry.MustRegister(binlogSkippedEventsTotal) registry.MustRegister(addedJobsTotal) registry.MustRegister(finishedJobsTotal) + registry.MustRegister(queueSizeGauge) registry.MustRegister(sqlRetriesTotal) registry.MustRegister(binlogPosGauge) registry.MustRegister(binlogFileGauge) registry.MustRegister(txnHistogram) + registry.MustRegister(stmtHistogram) + registry.MustRegister(queryHistogram) registry.MustRegister(cpuUsageGauge) registry.MustRegister(syncerExitWithErrorCounter) registry.MustRegister(replicationLagGauge) diff --git a/syncer/syncer.go b/syncer/syncer.go index b2e0ce61f2..a4fc437812 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -998,6 +998,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo for { select { case sqlJob, ok := <-jobChan: + queueSizeGauge.WithLabelValues(s.cfg.Name, queueBucket).Set(float64(len(jobChan))) if !ok { return } @@ -1017,7 +1018,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo clearF() } - default: + case <-time.After(waitTime): if len(jobs) > 0 { err = executeSQLs() if err != nil { @@ -1025,8 +1026,6 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo continue } clearF() - } else { - time.Sleep(waitTime) } } } @@ -1213,6 +1212,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { e = s.tryInject(latestOp, currentPos) latestOp = null } + + startTime := time.Now() if e == nil { failpoint.Inject("SyncerEventTimeout", func(val failpoint.Value) { if seconds, ok := val.(int); ok { @@ -1225,7 +1226,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { cancel() } - startTime := time.Now() if err == context.Canceled { s.tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last position", lastPos)) return nil @@ -1266,6 +1266,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } + + // time duration for reading an event from relay log or upstream master. + binlogReadDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds()) + startTime = time.Now() // reset start time for the next metric. + // get binlog event, reset tryReSync, so we can re-sync binlog while syncer meets errors next time tryReSync = true binlogPosGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(e.Header.LogPos)) @@ -1276,6 +1281,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { binlogFileGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(index)) } s.binlogSizeCount.Add(int64(e.Header.EventSize)) + binlogEventSizeHistogram.WithLabelValues(s.cfg.Name).Observe(float64(e.Header.EventSize)) failpoint.Inject("ProcessBinlogSlowDown", nil) @@ -1937,10 +1943,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } func (s *Syncer) commitJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql string, args []interface{}, keys []string, retry bool, pos, cmdPos mysql.Position, gs gtid.Set, traceID string) error { + startTime := time.Now() key, err := s.resolveCasuality(keys) if err != nil { return terror.ErrSyncerUnitResolveCasualityFail.Generate(err) } + conflictDetectDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds()) + job := newJob(tp, sourceSchema, sourceTable, targetSchema, targetTable, sql, args, key, pos, cmdPos, gs, traceID) return s.addJobFunc(job) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 0da9ebc1ff..c631f8db18 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -947,8 +947,8 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db)} - syncer.ddlDBConn = &DBConn{baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} - syncer.toDBConns = []*DBConn{{baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.reset() streamer, err := syncer.streamerProducer.generateStreamer(pos)