Skip to content

Commit

Permalink
dm-worker: refine conn (pingcap#266)
Browse files Browse the repository at this point in the history
* define conn
split workerConn and upstreamConn in syncer
split conn and db
unify conn in syncer and loader
* change upstream DB to BaseDB
* update raw db config
  • Loading branch information
3pointer committed Oct 18, 2019
1 parent a4412d5 commit 7049c77
Show file tree
Hide file tree
Showing 22 changed files with 787 additions and 319 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dmctl.log
relay_log/*
vendor
*/*.DS_Store
tidb-slow.log
37 changes: 37 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,52 @@ const (

var (
defaultMaxAllowedPacket = 64 * 1024 * 1024 // 64MiB, equal to TiDB's default
defaultMaxIdleConns = 2
)

// RawDBConfig contains some low level database config
type RawDBConfig struct {
MaxIdleConns int
ReadTimeout string
WriteTimeout string
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig() *RawDBConfig {
return &RawDBConfig{
MaxIdleConns: defaultMaxIdleConns,
}
}

// SetReadTimeout set readTimeout for raw database config
func (c *RawDBConfig) SetReadTimeout(readTimeout string) *RawDBConfig {
c.ReadTimeout = readTimeout
return c
}

// SetWriteTimeout set writeTimeout for raw database config
func (c *RawDBConfig) SetWriteTimeout(writeTimeout string) *RawDBConfig {
c.WriteTimeout = writeTimeout
return c
}

// SetMaxIdleConns set maxIdleConns for raw database config
// set value <= 0 then no idle connections are retained.
// set value > 0 then `value` idle connections are retained.
func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {
c.MaxIdleConns = value
return c
}

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

func (db *DBConfig) String() string {
Expand Down
24 changes: 20 additions & 4 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"go.uber.org/zap"
Expand Down Expand Up @@ -48,6 +50,9 @@ type CheckPoint interface {
// Init initialize checkpoint data in tidb
Init(filename string, endpos int64) error

// ResetConn resets database connections owned by the Checkpoint
ResetConn() error

// Close closes the CheckPoint
Close()

Expand All @@ -62,8 +67,10 @@ type CheckPoint interface {
}

// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
// it's not thread-safe
type RemoteCheckPoint struct {
conn *Conn // NOTE: use dbutil in tidb-tools later
db *conn.BaseDB
conn *DBConn
id string
schema string
table string
Expand All @@ -73,15 +80,16 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
conn, err := createConn(cfg)
db, dbConns, err := createConns(tctx, cfg, 1)
if err != nil {
return nil, err
}

newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint")))

cp := &RemoteCheckPoint{
conn: conn,
db: db,
conn: dbConns[0],
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
finishedTables: make(map[string]struct{}),
Expand Down Expand Up @@ -291,9 +299,17 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
return nil
}

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn() error {
return cp.conn.resetConn(cp.tctx)
}

// Close implements CheckPoint.Close
func (cp *RemoteCheckPoint) Close() {
closeConn(cp.conn)
err := cp.db.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint db", log.ShortError(err))
}
}

// GenSQL implements CheckPoint.GenSQL
Expand Down
8 changes: 6 additions & 2 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(count, Equals, len(cases))

// update checkpoints
conn, err := createConn(t.cfg)
db, conns, err := createConns(tctx, t.cfg, 1)
c.Assert(err, IsNil)
defer closeConn(conn)
conn := conns[0]
defer func() {
err = db.Close()
c.Assert(err, IsNil)
}()
for _, cs := range cases {
sql2 := cp.GenSQL(cs.filename, cs.endPos)
err = conn.executeSQL(tctx, []string{sql2})
Expand Down
95 changes: 71 additions & 24 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package loader

import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -27,21 +26,25 @@ import (
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/baseconn"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Conn represents a live DB connection
type Conn struct {
// DBConn represents a live DB connection
// it's not thread-safe
type DBConn struct {
cfg *config.SubTaskConfig
baseConn *baseconn.BaseConn
baseConn *conn.BaseConn

// generate new BaseConn and close old one
resetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error)
}

func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
if conn == nil || conn.baseConn == nil {
return nil, terror.ErrDBUnExpect.Generate("database connection not valid")
}
Expand All @@ -51,6 +54,17 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateInterface(query, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("query statement", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateString(query, -1)),
Expand Down Expand Up @@ -90,7 +104,7 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
return ret.(*sql.Rows), nil
}

func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
if len(queries) == 0 {
return nil
}
Expand All @@ -104,14 +118,29 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
return retry.IsRetryableError(err)
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return true
}
return false
},
}

_, _, err := conn.baseConn.ApplyRetryStrategy(
ctx,
params,
Expand Down Expand Up @@ -149,23 +178,41 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
return err
}

func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
// resetConn reset one worker connection from specify *BaseDB
func (conn *DBConn) resetConn(tctx *tcontext.Context) error {
baseConn, err := conn.resetBaseConnFn(tctx, conn.baseConn)
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
return err
}

return &Conn{baseConn: baseConn, cfg: cfg}, nil
conn.baseConn = baseConn
return nil
}

func closeConn(conn *Conn) error {
if conn.baseConn == nil {
return nil
func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
if err != nil {
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}

return terror.DBErrorAdapt(conn.baseConn.Close(), terror.ErrDBDriverError)
conns := make([]*DBConn, 0, workerCount)
for i := 0; i < workerCount; i++ {
baseConn, err := baseDB.GetBaseConn(tctx.Context())
if err != nil {
terr := baseDB.Close()
if terr != nil {
tctx.L().Error("failed to close baseDB", zap.Error(terr))
}
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}
resetBaseConnFn := func(tctx *tcontext.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) {
err := baseDB.CloseBaseConn(baseConn)
if err != nil {
tctx.L().Warn("failed to close baseConn in reset")
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}

func isErrDBExists(err error) bool {
Expand Down
Loading

0 comments on commit 7049c77

Please sign in to comment.