Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-worker: refine conn (#266) #325

Merged
merged 1 commit into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dmctl.log
relay_log/*
vendor
*/*.DS_Store
tidb-slow.log
37 changes: 37 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,52 @@ const (

var (
defaultMaxAllowedPacket = 64 * 1024 * 1024 // 64MiB, equal to TiDB's default
defaultMaxIdleConns = 2
)

// RawDBConfig contains some low level database config
type RawDBConfig struct {
MaxIdleConns int
ReadTimeout string
WriteTimeout string
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig() *RawDBConfig {
return &RawDBConfig{
MaxIdleConns: defaultMaxIdleConns,
}
}

// SetReadTimeout set readTimeout for raw database config
func (c *RawDBConfig) SetReadTimeout(readTimeout string) *RawDBConfig {
c.ReadTimeout = readTimeout
return c
}

// SetWriteTimeout set writeTimeout for raw database config
func (c *RawDBConfig) SetWriteTimeout(writeTimeout string) *RawDBConfig {
c.WriteTimeout = writeTimeout
return c
}

// SetMaxIdleConns set maxIdleConns for raw database config
// set value <= 0 then no idle connections are retained.
// set value > 0 then `value` idle connections are retained.
func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {
c.MaxIdleConns = value
return c
}

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

func (db *DBConfig) String() string {
Expand Down
24 changes: 20 additions & 4 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"go.uber.org/zap"
Expand Down Expand Up @@ -48,6 +50,9 @@ type CheckPoint interface {
// Init initialize checkpoint data in tidb
Init(filename string, endpos int64) error

// ResetConn resets database connections owned by the Checkpoint
ResetConn() error

// Close closes the CheckPoint
Close()

Expand All @@ -62,8 +67,10 @@ type CheckPoint interface {
}

// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
// it's not thread-safe
type RemoteCheckPoint struct {
conn *Conn // NOTE: use dbutil in tidb-tools later
db *conn.BaseDB
conn *DBConn
id string
schema string
table string
Expand All @@ -73,15 +80,16 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
conn, err := createConn(cfg)
db, dbConns, err := createConns(tctx, cfg, 1)
if err != nil {
return nil, err
}

newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint")))

cp := &RemoteCheckPoint{
conn: conn,
db: db,
conn: dbConns[0],
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
finishedTables: make(map[string]struct{}),
Expand Down Expand Up @@ -291,9 +299,17 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
return nil
}

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn() error {
return cp.conn.resetConn(cp.tctx)
}

// Close implements CheckPoint.Close
func (cp *RemoteCheckPoint) Close() {
closeConn(cp.conn)
err := cp.db.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint db", log.ShortError(err))
}
}

// GenSQL implements CheckPoint.GenSQL
Expand Down
8 changes: 6 additions & 2 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(count, Equals, len(cases))

// update checkpoints
conn, err := createConn(t.cfg)
db, conns, err := createConns(tctx, t.cfg, 1)
c.Assert(err, IsNil)
defer closeConn(conn)
conn := conns[0]
defer func() {
err = db.Close()
c.Assert(err, IsNil)
}()
for _, cs := range cases {
sql2 := cp.GenSQL(cs.filename, cs.endPos)
err = conn.executeSQL(tctx, []string{sql2})
Expand Down
95 changes: 71 additions & 24 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package loader

import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -27,21 +26,25 @@ import (
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/baseconn"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Conn represents a live DB connection
type Conn struct {
// DBConn represents a live DB connection
// it's not thread-safe
type DBConn struct {
cfg *config.SubTaskConfig
baseConn *baseconn.BaseConn
baseConn *conn.BaseConn

// generate new BaseConn and close old one
resetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error)
}

func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
if conn == nil || conn.baseConn == nil {
return nil, terror.ErrDBUnExpect.Generate("database connection not valid")
}
Expand All @@ -51,6 +54,17 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateInterface(query, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("query statement", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateString(query, -1)),
Expand Down Expand Up @@ -90,7 +104,7 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
return ret.(*sql.Rows), nil
}

func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
if len(queries) == 0 {
return nil
}
Expand All @@ -104,14 +118,29 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
return retry.IsRetryableError(err)
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return true
}
return false
},
}

_, _, err := conn.baseConn.ApplyRetryStrategy(
ctx,
params,
Expand Down Expand Up @@ -149,23 +178,41 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
return err
}

func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
// resetConn reset one worker connection from specify *BaseDB
func (conn *DBConn) resetConn(tctx *tcontext.Context) error {
baseConn, err := conn.resetBaseConnFn(tctx, conn.baseConn)
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
return err
}

return &Conn{baseConn: baseConn, cfg: cfg}, nil
conn.baseConn = baseConn
return nil
}

func closeConn(conn *Conn) error {
if conn.baseConn == nil {
return nil
func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
if err != nil {
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}

return terror.DBErrorAdapt(conn.baseConn.Close(), terror.ErrDBDriverError)
conns := make([]*DBConn, 0, workerCount)
for i := 0; i < workerCount; i++ {
baseConn, err := baseDB.GetBaseConn(tctx.Context())
if err != nil {
terr := baseDB.Close()
if terr != nil {
tctx.L().Error("failed to close baseDB", zap.Error(terr))
}
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}
resetBaseConnFn := func(tctx *tcontext.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) {
err := baseDB.CloseBaseConn(baseConn)
if err != nil {
tctx.L().Warn("failed to close baseConn in reset")
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}

func isErrDBExists(err error) bool {
Expand Down
Loading