Skip to content

Commit

Permalink
syncer: add more metrics; refine batch handle (pingcap#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Apr 14, 2020
1 parent c286b9c commit 9b85963
Show file tree
Hide file tree
Showing 12 changed files with 562 additions and 31 deletions.
413 changes: 410 additions & 3 deletions dm/dm-ansible/scripts/dm.json

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
_, err := conn.baseConn.ExecuteSQL(ctx, queries, args...)
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...)
failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
errCode, err1 := strconv.ParseUint(val.(string), 10, 16)
if err1 != nil {
Expand All @@ -163,7 +163,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return nil, err
Expand Down
15 changes: 13 additions & 2 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
Subsystem: "loader",
Name: "query_duration_time",
Help: "Bucketed histogram of query time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

txnHistogram = metricsproxy.NewHistogramVec(
Expand All @@ -44,9 +44,18 @@ var (
Subsystem: "loader",
Name: "txn_duration_time",
Help: "Bucketed histogram of processing time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

stmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

dataFileGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down Expand Up @@ -94,6 +103,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(tidbExecutionErrorCounter)
registry.MustRegister(txnHistogram)
registry.MustRegister(queryHistogram)
registry.MustRegister(stmtHistogram)
registry.MustRegister(dataFileGauge)
registry.MustRegister(tableGauge)
registry.MustRegister(dataSizeGauge)
Expand All @@ -105,6 +115,7 @@ func (m *Loader) removeLabelValuesWithTaskInMetrics(task string) {
tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
Expand Down
20 changes: 16 additions & 4 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql/driver"
"fmt"
"strings"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
Expand All @@ -26,6 +27,7 @@ import (

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -114,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand All @@ -140,11 +142,13 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
return 0, terror.ErrDBUnExpect.Generate("database connection not valid")
}

startTime := time.Now()
txn, err := conn.DBConn.BeginTx(tctx.Context(), nil)

if err != nil {
return 0, terror.ErrDBExecuteFailed.Delegate(err, "begin")
}
hVec.WithLabelValues("begin", task).Observe(time.Since(startTime).Seconds())

l := len(queries)

Expand All @@ -158,8 +162,11 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)))

startTime = time.Now()
_, err = txn.ExecContext(tctx.Context(), query, arg...)
if err != nil {
if err == nil {
hVec.WithLabelValues("stmt", task).Observe(time.Since(startTime).Seconds())
} else {
if ignoreErr != nil && ignoreErr(err) {
tctx.L().Warn("execute statement failed and will ignore this error",
zap.String("query", utils.TruncateString(query, -1)),
Expand All @@ -172,30 +179,35 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

startTime = time.Now()
rerr := txn.Rollback()
if rerr != nil {
tctx.L().Error("rollback failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)),
log.ShortError(rerr))
} else {
hVec.WithLabelValues("rollback", task).Observe(time.Since(startTime).Seconds())
}
// we should return the exec err, instead of the rollback rerr.
return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1))
}
}
startTime = time.Now()
err = txn.Commit()
if err != nil {
return l - 1, terror.ErrDBExecuteFailed.Delegate(err, "commit") // mark failed on the last one
}
hVec.WithLabelValues("commit", task).Observe(time.Since(startTime).Seconds())
return l, nil
}

// ExecuteSQL executes sql on real DB,
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, nil, queries, args...)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

// ApplyRetryStrategy apply specify strategy for BaseConn
Expand Down
23 changes: 18 additions & 5 deletions pkg/conn/baseconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"testing"

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/metricsproxy"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/prometheus/client_golang/prometheus"
)

func TestSuite(t *testing.T) {
Expand All @@ -34,6 +36,17 @@ var _ = Suite(&testBaseConnSuite{})
type testBaseConnSuite struct {
}

var (
testStmtHistogram = metricsproxy.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})
)

func (t *testBaseConnSuite) TestBaseConn(c *C) {
baseConn := NewBaseConn(nil, nil)

Expand All @@ -44,7 +57,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

_, err = baseConn.ExecuteSQL(tctx, []string{""})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

db, mock, err := sqlmock.New()
Expand Down Expand Up @@ -74,24 +87,24 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBQueryFailed.Equal(err), IsTrue)

affected, _ := baseConn.ExecuteSQL(tctx, []string{""})
affected, _ := baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(affected, Equals, 0)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)

mock.ExpectBegin().WillReturnError(errors.New("begin error"))
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnError(errors.New("invalid connection"))
mock.ExpectRollback()
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (t *testBaseDBSuite) TestGetBaseConn(c *C) {
mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err := dbConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err := dbConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)
}
6 changes: 3 additions & 3 deletions relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (
Subsystem: "relay",
Name: "write_duration",
Help: "bucketed histogram of write time (s) of single relay log event",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand All @@ -111,7 +111,7 @@ var (
Subsystem: "relay",
Name: "read_binlog_duration",
Help: "bucketed histogram of read time (s) of single binlog event from the master.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

binlogTransformDurationHistogram = prometheus.NewHistogram(
Expand All @@ -120,7 +120,7 @@ var (
Subsystem: "relay",
Name: "read_transform_duration",
Help: "bucketed histogram of transform time (s) of single binlog event.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand Down
21 changes: 18 additions & 3 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,19 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
tctx,
params,
func(ctx *tcontext.Context) (interface{}, error) {
return conn.baseConn.QuerySQL(ctx, query, args...)
startTime := time.Now()
ret, err := conn.baseConn.QuerySQL(ctx, query, args...)
if err == nil {
cost := time.Since(startTime)
queryHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("query statement",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
},
)

Expand Down Expand Up @@ -243,12 +255,15 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, ignoreError, queries, args...)
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, stmtHistogram, conn.cfg.Name, ignoreError, queries, args...)
if err == nil {
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
Expand Down
2 changes: 1 addition & 1 deletion syncer/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
var (
syncer = NewSyncer(s.cfg, nil)
tctx = tcontext.Background()
conn2 = &DBConn{resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) {
conn2 = &DBConn{cfg: s.cfg, resetBaseConnFn: func(*context.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, nil
}}
customErr = errors.New("custom error")
Expand Down
Loading

0 comments on commit 9b85963

Please sign in to comment.