Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-worker: refine conn #266

Merged
merged 61 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f8ece5b
define conn
3pointer Aug 20, 2019
ba9c588
change upstream DB to BaseDB
3pointer Sep 16, 2019
25b7016
update raw db config
3pointer Sep 16, 2019
7ce2dac
fix test
3pointer Sep 16, 2019
ecf7bf5
remove useless code
3pointer Sep 16, 2019
f579445
add resetConnFn instead of baseDB in syncer.WorkerConn to reset
3pointer Sep 16, 2019
2f90c68
Update loader/checkpoint.go
3pointer Sep 17, 2019
9585091
Update loader/checkpoint.go
3pointer Sep 17, 2019
d8e92cc
address comments
3pointer Sep 17, 2019
418ccf8
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 17, 2019
a710aff
address comment
3pointer Sep 17, 2019
ed5c1a9
address comments
3pointer Sep 18, 2019
da36007
address comments
3pointer Sep 18, 2019
9133d17
Merge branch 'master' into refine_conn
3pointer Sep 18, 2019
8035a0c
address comment
3pointer Sep 18, 2019
8266bf0
address comment
3pointer Sep 23, 2019
9e031c4
baseDB close conns generated from itself
3pointer Sep 23, 2019
cd8c5f6
resolve conflict
3pointer Sep 23, 2019
6e65f70
address comment
3pointer Sep 25, 2019
a8342a1
use createConns instead of createConn
3pointer Sep 25, 2019
25cbcdc
address comments
3pointer Sep 25, 2019
f80c5bc
address comments
3pointer Sep 25, 2019
67cf1e2
address comments
3pointer Sep 25, 2019
2ccdefe
Merge branch 'master' into refine_conn
3pointer Sep 25, 2019
d3dd6c1
address comments
3pointer Sep 26, 2019
306aa19
remove loader DBConn close
3pointer Sep 26, 2019
faa4876
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 26, 2019
1168803
Merge branch 'master' into refine_conn
3pointer Sep 26, 2019
553578e
address comment
3pointer Sep 26, 2019
fc6929e
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 26, 2019
fd606de
address comments
3pointer Sep 29, 2019
a835b47
address comment
3pointer Sep 29, 2019
62e64b2
Update loader/db.go
3pointer Oct 8, 2019
7d0a41c
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
7816f31
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
fd6caa4
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
2446fb4
loader unit add resetConn function
3pointer Oct 8, 2019
457b856
address comment
3pointer Oct 8, 2019
dec0aeb
add comments for BaseConn
3pointer Oct 8, 2019
5953333
update comments
3pointer Oct 8, 2019
21d8523
address comments
3pointer Oct 9, 2019
17c0594
address comments
3pointer Oct 10, 2019
7e6abba
address comment
3pointer Oct 11, 2019
a7f91d8
refine reset in baseConn
3pointer Oct 11, 2019
8b2be19
address comments
3pointer Oct 11, 2019
8a724da
Merge branch 'master' into refine_conn
csuzhangxc Oct 12, 2019
eac3868
use reset closure to reset BaseConn
3pointer Oct 12, 2019
032912b
address comment
3pointer Oct 12, 2019
1b9e351
address comments
3pointer Oct 14, 2019
92459f4
fix test
3pointer Oct 14, 2019
ad11983
fix test
3pointer Oct 14, 2019
35376e7
address comment
3pointer Oct 15, 2019
26190cc
address comment
3pointer Oct 15, 2019
b8f4a92
fix test
3pointer Oct 15, 2019
b8afd4d
return commit err index
3pointer Oct 16, 2019
ae9080c
resolve conflict
3pointer Oct 16, 2019
1104960
Merge branch 'master' into refine_conn
3pointer Oct 17, 2019
0a62846
fix test
3pointer Oct 17, 2019
6407d25
resolve conflict
3pointer Oct 17, 2019
8ac2bf1
address comments
3pointer Oct 18, 2019
aaa8665
address comment
3pointer Oct 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dmctl.log
relay_log/*
vendor
*/*.DS_Store
tidb-slow.log
37 changes: 37 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,52 @@ const (

var (
defaultMaxAllowedPacket = 64 * 1024 * 1024 // 64MiB, equal to TiDB's default
defaultMaxIdleConns = 2
)

// RawDBConfig contains some low level database config
type RawDBConfig struct {
MaxIdleConns int
ReadTimeout string
WriteTimeout string
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig() *RawDBConfig {
return &RawDBConfig{
MaxIdleConns: defaultMaxIdleConns,
}
}

// SetReadTimeout set readTimeout for raw database config
func (c *RawDBConfig) SetReadTimeout(readTimeout string) *RawDBConfig {
c.ReadTimeout = readTimeout
return c
}

// SetWriteTimeout set writeTimeout for raw database config
func (c *RawDBConfig) SetWriteTimeout(writeTimeout string) *RawDBConfig {
c.WriteTimeout = writeTimeout
return c
}

// SetMaxIdleConns set maxIdleConns for raw database config
// set value <= 0 then no idle connections are retained.
// set value > 0 then `value` idle connections are retained.
func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {
c.MaxIdleConns = value
return c
}

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

func (db *DBConfig) String() string {
Expand Down
24 changes: 20 additions & 4 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"go.uber.org/zap"
Expand Down Expand Up @@ -48,6 +50,9 @@ type CheckPoint interface {
// Init initialize checkpoint data in tidb
Init(filename string, endpos int64) error

// ResetConn resets database connections owned by the Checkpoint
ResetConn() error

// Close closes the CheckPoint
Close()

Expand All @@ -62,8 +67,10 @@ type CheckPoint interface {
}

// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
// it's not thread-safe
type RemoteCheckPoint struct {
conn *Conn // NOTE: use dbutil in tidb-tools later
db *conn.BaseDB
conn *DBConn
id string
schema string
table string
Expand All @@ -73,15 +80,16 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
conn, err := createConn(cfg)
db, dbConns, err := createConns(tctx, cfg, 1)
if err != nil {
return nil, err
}

newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint")))

cp := &RemoteCheckPoint{
conn: conn,
db: db,
conn: dbConns[0],
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
finishedTables: make(map[string]struct{}),
Expand Down Expand Up @@ -291,9 +299,17 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
return nil
}

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn() error {
return cp.conn.resetConn(cp.tctx)
}

// Close implements CheckPoint.Close
func (cp *RemoteCheckPoint) Close() {
closeConn(cp.conn)
err := cp.db.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint db", log.ShortError(err))
}
}

// GenSQL implements CheckPoint.GenSQL
Expand Down
8 changes: 6 additions & 2 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(count, Equals, len(cases))

// update checkpoints
conn, err := createConn(t.cfg)
db, conns, err := createConns(tctx, t.cfg, 1)
c.Assert(err, IsNil)
defer closeConn(conn)
conn := conns[0]
defer func() {
err = db.Close()
c.Assert(err, IsNil)
}()
for _, cs := range cases {
sql2 := cp.GenSQL(cs.filename, cs.endPos)
err = conn.executeSQL(tctx, []string{sql2})
Expand Down
95 changes: 71 additions & 24 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package loader

import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -27,21 +26,25 @@ import (
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/baseconn"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Conn represents a live DB connection
type Conn struct {
// DBConn represents a live DB connection
// it's not thread-safe
type DBConn struct {
cfg *config.SubTaskConfig
baseConn *baseconn.BaseConn
baseConn *conn.BaseConn

// generate new BaseConn and close old one
resetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error)
}

func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
if conn == nil || conn.baseConn == nil {
return nil, terror.ErrDBUnExpect.Generate("database connection not valid")
}
Expand All @@ -51,6 +54,17 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateInterface(query, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("query statement", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateString(query, -1)),
Expand Down Expand Up @@ -90,7 +104,7 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
return ret.(*sql.Rows), nil
}

func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
if len(queries) == 0 {
return nil
}
Expand All @@ -104,14 +118,29 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
return retry.IsRetryableError(err)
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
ctx.L().Error("reset connection failed", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return false
}
return true
}
if retry.IsRetryableError(err) {
ctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
return true
}
return false
},
}

_, _, err := conn.baseConn.ApplyRetryStrategy(
ctx,
params,
Expand Down Expand Up @@ -149,23 +178,41 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
return err
}

func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
// resetConn reset one worker connection from specify *BaseDB
func (conn *DBConn) resetConn(tctx *tcontext.Context) error {
baseConn, err := conn.resetBaseConnFn(tctx, conn.baseConn)
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
return err
}

return &Conn{baseConn: baseConn, cfg: cfg}, nil
conn.baseConn = baseConn
return nil
}

func closeConn(conn *Conn) error {
if conn.baseConn == nil {
return nil
func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
if err != nil {
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}

return terror.DBErrorAdapt(conn.baseConn.Close(), terror.ErrDBDriverError)
conns := make([]*DBConn, 0, workerCount)
for i := 0; i < workerCount; i++ {
baseConn, err := baseDB.GetBaseConn(tctx.Context())
if err != nil {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
terr := baseDB.Close()
if terr != nil {
tctx.L().Error("failed to close baseDB", zap.Error(terr))
}
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}
resetBaseConnFn := func(tctx *tcontext.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) {
err := baseDB.CloseBaseConn(baseConn)
if err != nil {
tctx.L().Warn("failed to close baseConn in reset")
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}

func isErrDBExists(err error) bool {
Expand Down
Loading