Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: add more metrics; refine batch handle #590

Merged
merged 7 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 414 additions & 8 deletions dm/dm-ansible/scripts/dm.json

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
_, err := conn.baseConn.ExecuteSQL(ctx, queries, args...)
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...)
failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
errCode, err1 := strconv.ParseUint(val.(string), 10, 16)
if err1 != nil {
Expand All @@ -163,7 +163,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return nil, err
Expand Down
14 changes: 12 additions & 2 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
Subsystem: "loader",
Name: "query_duration_time",
Help: "Bucketed histogram of query time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

txnHistogram = prometheus.NewHistogramVec(
Expand All @@ -42,9 +42,18 @@ var (
Subsystem: "loader",
Name: "txn_duration_time",
Help: "Bucketed histogram of processing time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

stmtHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

dataFileGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down Expand Up @@ -92,6 +101,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(tidbExecutionErrorCounter)
registry.MustRegister(txnHistogram)
registry.MustRegister(queryHistogram)
registry.MustRegister(stmtHistogram)
registry.MustRegister(dataFileGauge)
registry.MustRegister(tableGauge)
registry.MustRegister(dataSizeGauge)
Expand Down
20 changes: 16 additions & 4 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"database/sql/driver"
"fmt"
"strings"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

Expand Down Expand Up @@ -114,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand All @@ -140,11 +142,13 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
return 0, terror.ErrDBUnExpect.Generate("database connection not valid")
}

startTime := time.Now()
txn, err := conn.DBConn.BeginTx(tctx.Context(), nil)

if err != nil {
return 0, terror.ErrDBExecuteFailed.Delegate(err, "begin")
}
hVec.WithLabelValues("begin", task).Observe(time.Since(startTime).Seconds())

l := len(queries)

Expand All @@ -158,8 +162,11 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)))

startTime = time.Now()
_, err = txn.ExecContext(tctx.Context(), query, arg...)
if err != nil {
if err == nil {
hVec.WithLabelValues("stmt", task).Observe(time.Since(startTime).Seconds())
} else {
if ignoreErr != nil && ignoreErr(err) {
tctx.L().Warn("execute statement failed and will ignore this error",
zap.String("query", utils.TruncateString(query, -1)),
Expand All @@ -172,30 +179,35 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

startTime = time.Now()
rerr := txn.Rollback()
if rerr != nil {
tctx.L().Error("rollback failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)),
log.ShortError(rerr))
} else {
hVec.WithLabelValues("rollback", task).Observe(time.Since(startTime).Seconds())
}
// we should return the exec err, instead of the rollback rerr.
return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1))
}
}
startTime = time.Now()
err = txn.Commit()
if err != nil {
return l - 1, terror.ErrDBExecuteFailed.Delegate(err, "commit") // mark failed on the last one
}
hVec.WithLabelValues("commit", task).Observe(time.Since(startTime).Seconds())
return l, nil
}

// ExecuteSQL executes sql on real DB,
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, nil, queries, args...)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

// ApplyRetryStrategy apply specify strategy for BaseConn
Expand Down
22 changes: 17 additions & 5 deletions pkg/conn/baseconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/prometheus/client_golang/prometheus"
)

func TestSuite(t *testing.T) {
Expand All @@ -34,6 +35,17 @@ var _ = Suite(&testBaseConnSuite{})
type testBaseConnSuite struct {
}

var (
testStmtHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})
)

func (t *testBaseConnSuite) TestBaseConn(c *C) {
baseConn := NewBaseConn(nil, nil)

Expand All @@ -44,7 +56,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

_, err = baseConn.ExecuteSQL(tctx, []string{""})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

db, mock, err := sqlmock.New()
Expand Down Expand Up @@ -74,24 +86,24 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBQueryFailed.Equal(err), IsTrue)

affected, _ := baseConn.ExecuteSQL(tctx, []string{""})
affected, _ := baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(affected, Equals, 0)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)

mock.ExpectBegin().WillReturnError(errors.New("begin error"))
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnError(errors.New("invalid connection"))
mock.ExpectRollback()
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (t *testBaseDBSuite) TestGetBaseConn(c *C) {
mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err := dbConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err := dbConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)
}
6 changes: 3 additions & 3 deletions relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var (
Subsystem: "relay",
Name: "write_duration",
Help: "bucketed histogram of write time (s) of single relay log event",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand All @@ -110,7 +110,7 @@ var (
Subsystem: "relay",
Name: "read_binlog_duration",
Help: "bucketed histogram of read time (s) of single binlog event from the master.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

binlogTransformDurationHistogram = prometheus.NewHistogram(
Expand All @@ -119,7 +119,7 @@ var (
Subsystem: "relay",
Name: "read_transform_duration",
Help: "bucketed histogram of transform time (s) of single binlog event.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand Down
21 changes: 18 additions & 3 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,19 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
tctx,
params,
func(ctx *tcontext.Context) (interface{}, error) {
return conn.baseConn.QuerySQL(ctx, query, args...)
startTime := time.Now()
ret, err := conn.baseConn.QuerySQL(ctx, query, args...)
if err == nil {
cost := time.Since(startTime)
queryHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("query statement",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
},
)

Expand Down Expand Up @@ -265,12 +277,15 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, ignoreError, queries, args...)
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, stmtHistogram, conn.cfg.Name, ignoreError, queries, args...)
if err == nil {
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
Expand Down
2 changes: 1 addition & 1 deletion syncer/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
var (
syncer = NewSyncer(s.cfg)
tctx = tcontext.Background()
conn2 = &DBConn{resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) {
conn2 = &DBConn{cfg: s.cfg, resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, nil
}}
customErr = errors.New("custom error")
Expand Down
Loading