diff --git a/.gitignore b/.gitignore index d61d0044f4..8549cc6115 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ dmctl.log relay_log/* vendor */*.DS_Store +tidb-slow.log diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 082165a5bb..bb3a0a03cb 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -42,8 +42,43 @@ const ( var ( defaultMaxAllowedPacket = 64 * 1024 * 1024 // 64MiB, equal to TiDB's default + defaultMaxIdleConns = 2 ) +// RawDBConfig contains some low level database config +type RawDBConfig struct { + MaxIdleConns int + ReadTimeout string + WriteTimeout string +} + +// DefaultRawDBConfig returns a default raw database config +func DefaultRawDBConfig() *RawDBConfig { + return &RawDBConfig{ + MaxIdleConns: defaultMaxIdleConns, + } +} + +// SetReadTimeout set readTimeout for raw database config +func (c *RawDBConfig) SetReadTimeout(readTimeout string) *RawDBConfig { + c.ReadTimeout = readTimeout + return c +} + +// SetWriteTimeout set writeTimeout for raw database config +func (c *RawDBConfig) SetWriteTimeout(writeTimeout string) *RawDBConfig { + c.WriteTimeout = writeTimeout + return c +} + +// SetMaxIdleConns set maxIdleConns for raw database config +// set value <= 0 then no idle connections are retained. +// set value > 0 then `value` idle connections are retained. +func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig { + c.MaxIdleConns = value + return c +} + // DBConfig is the DB configuration. type DBConfig struct { Host string `toml:"host" json:"host" yaml:"host"` @@ -51,6 +86,8 @@ type DBConfig struct { User string `toml:"user" json:"user" yaml:"user"` Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"` + + RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"` } func (db *DBConfig) String() string { diff --git a/loader/checkpoint.go b/loader/checkpoint.go index 1b3e24d4c6..dc5a0371d1 100644 --- a/loader/checkpoint.go +++ b/loader/checkpoint.go @@ -20,7 +20,9 @@ import ( "time" "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "go.uber.org/zap" @@ -48,6 +50,9 @@ type CheckPoint interface { // Init initialize checkpoint data in tidb Init(filename string, endpos int64) error + // ResetConn resets database connections owned by the Checkpoint + ResetConn() error + // Close closes the CheckPoint Close() @@ -62,8 +67,10 @@ type CheckPoint interface { } // RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB. +// it's not thread-safe type RemoteCheckPoint struct { - conn *Conn // NOTE: use dbutil in tidb-tools later + db *conn.BaseDB + conn *DBConn id string schema string table string @@ -73,7 +80,7 @@ type RemoteCheckPoint struct { } func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) { - conn, err := createConn(cfg) + db, dbConns, err := createConns(tctx, cfg, 1) if err != nil { return nil, err } @@ -81,7 +88,8 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))) cp := &RemoteCheckPoint{ - conn: conn, + db: db, + conn: dbConns[0], id: id, restoringFiles: make(map[string]map[string]FilePosSet), finishedTables: make(map[string]struct{}), @@ -291,9 +299,17 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error { return nil } +// ResetConn implements CheckPoint.ResetConn +func (cp *RemoteCheckPoint) ResetConn() error { + return cp.conn.resetConn(cp.tctx) +} + // Close implements CheckPoint.Close func (cp *RemoteCheckPoint) Close() { - closeConn(cp.conn) + err := cp.db.Close() + if err != nil { + cp.tctx.L().Error("close checkpoint db", log.ShortError(err)) + } } // GenSQL implements CheckPoint.GenSQL diff --git a/loader/checkpoint_test.go b/loader/checkpoint_test.go index c388665a0b..7eeb93c7c3 100644 --- a/loader/checkpoint_test.go +++ b/loader/checkpoint_test.go @@ -113,9 +113,13 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(count, Equals, len(cases)) // update checkpoints - conn, err := createConn(t.cfg) + db, conns, err := createConns(tctx, t.cfg, 1) c.Assert(err, IsNil) - defer closeConn(conn) + conn := conns[0] + defer func() { + err = db.Close() + c.Assert(err, IsNil) + }() for _, cs := range cases { sql2 := cp.GenSQL(cs.filename, cs.endPos) err = conn.executeSQL(tctx, []string{sql2}) diff --git a/loader/db.go b/loader/db.go index ea9dff035a..989b457ca1 100644 --- a/loader/db.go +++ b/loader/db.go @@ -15,7 +15,6 @@ package loader import ( "database/sql" - "fmt" "strconv" "strings" "time" @@ -27,7 +26,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/dm/dm/config" - "github.com/pingcap/dm/pkg/baseconn" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" @@ -35,13 +34,17 @@ import ( "github.com/pingcap/dm/pkg/utils" ) -// Conn represents a live DB connection -type Conn struct { +// DBConn represents a live DB connection +// it's not thread-safe +type DBConn struct { cfg *config.SubTaskConfig - baseConn *baseconn.BaseConn + baseConn *conn.BaseConn + + // generate new BaseConn and close old one + resetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) } -func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) { +func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) { if conn == nil || conn.baseConn == nil { return nil, terror.ErrDBUnExpect.Generate("database connection not valid") } @@ -51,6 +54,17 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac FirstRetryDuration: time.Second, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { + if retry.IsConnectionError(err) { + err = conn.resetConn(ctx) + if err != nil { + ctx.L().Error("reset connection failed", zap.Int("retry", retryTime), + zap.String("query", utils.TruncateInterface(query, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + return false + } + return true + } if retry.IsRetryableError(err) { ctx.L().Warn("query statement", zap.Int("retry", retryTime), zap.String("query", utils.TruncateString(query, -1)), @@ -90,7 +104,7 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac return ret.(*sql.Rows), nil } -func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error { +func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error { if len(queries) == 0 { return nil } @@ -104,14 +118,29 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] FirstRetryDuration: 2 * time.Second, BackoffStrategy: retry.LinearIncrease, IsRetryableFn: func(retryTime int, err error) bool { - ctx.L().Warn("execute statements", zap.Int("retry", retryTime), - zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc() - return retry.IsRetryableError(err) + if retry.IsConnectionError(err) { + err = conn.resetConn(ctx) + if err != nil { + ctx.L().Error("reset connection failed", zap.Int("retry", retryTime), + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + return false + } + return true + } + if retry.IsRetryableError(err) { + ctx.L().Warn("execute statements", zap.Int("retry", retryTime), + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + return true + } + return false }, } + _, _, err := conn.baseConn.ApplyRetryStrategy( ctx, params, @@ -149,23 +178,41 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] return err } -func createConn(cfg *config.SubTaskConfig) (*Conn, error) { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d", - cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()) +// resetConn reset one worker connection from specify *BaseDB +func (conn *DBConn) resetConn(tctx *tcontext.Context) error { + baseConn, err := conn.resetBaseConnFn(tctx, conn.baseConn) if err != nil { - return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) + return err } - - return &Conn{baseConn: baseConn, cfg: cfg}, nil + conn.baseConn = baseConn + return nil } -func closeConn(conn *Conn) error { - if conn.baseConn == nil { - return nil +func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) { + baseDB, err := conn.DefaultDBProvider.Apply(cfg.To) + if err != nil { + return nil, nil, terror.WithScope(err, terror.ScopeDownstream) } - - return terror.DBErrorAdapt(conn.baseConn.Close(), terror.ErrDBDriverError) + conns := make([]*DBConn, 0, workerCount) + for i := 0; i < workerCount; i++ { + baseConn, err := baseDB.GetBaseConn(tctx.Context()) + if err != nil { + terr := baseDB.Close() + if terr != nil { + tctx.L().Error("failed to close baseDB", zap.Error(terr)) + } + return nil, nil, terror.WithScope(err, terror.ScopeDownstream) + } + resetBaseConnFn := func(tctx *tcontext.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) { + err := baseDB.CloseBaseConn(baseConn) + if err != nil { + tctx.L().Warn("failed to close baseConn in reset") + } + return baseDB.GetBaseConn(tctx.Context()) + } + conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn}) + } + return baseDB, conns, nil } func isErrDBExists(err error) bool { diff --git a/loader/loader.go b/loader/loader.go index 20b9cf58c6..c82298ee33 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/log" @@ -55,7 +56,6 @@ type DataFiles []string // Tables2DataFiles represent all data files of a table collection as a map type Tables2DataFiles map[string]DataFiles - type dataJob struct { sql string schema string @@ -77,7 +77,7 @@ type Worker struct { id int cfg *config.SubTaskConfig checkPoint CheckPoint - conn *Conn + conn *DBConn wg sync.WaitGroup jobQueue chan *dataJob loader *Loader @@ -89,18 +89,13 @@ type Worker struct { // NewWorker returns a Worker. func NewWorker(loader *Loader, id int) (worker *Worker, err error) { - conn, err := createConn(loader.cfg) - if err != nil { - return nil, err - } - ctctx := loader.tctx.WithLogger(loader.tctx.L().WithFields(zap.Int("worker ID", id))) return &Worker{ id: id, cfg: loader.cfg, checkPoint: loader.checkPoint, - conn: conn, + conn: loader.toDBConns[id], jobQueue: make(chan *dataJob, jobCount), loader: loader, tctx: ctctx, @@ -115,7 +110,6 @@ func (w *Worker) Close() { close(w.jobQueue) w.wg.Wait() - closeConn(w.conn) } func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) { @@ -342,6 +336,9 @@ type Loader struct { pool []*Worker closed sync2.AtomicBool + toDB *conn.BaseDB + toDBConns []*DBConn + totalDataSize sync2.AtomicInt64 totalFileCount sync2.AtomicInt64 // schema + table + data finishedDataSize sync2.AtomicInt64 @@ -409,6 +406,15 @@ func (l *Loader) Init() (err error) { } } + dbCfg := l.cfg.To + dbCfg.RawDBCfg = config.DefaultRawDBConfig(). + SetMaxIdleConns(l.cfg.PoolSize) + + l.toDB, l.toDBConns, err = createConns(l.tctx, l.cfg, l.cfg.PoolSize) + if err != nil { + return err + } + return nil } @@ -556,6 +562,11 @@ func (l *Loader) Close() { } l.stopLoad() + + err := l.toDB.Close() + if err != nil { + l.tctx.L().Error("close downstream DB error", log.ShortError(err)) + } l.checkPoint.Close() l.closed.Set(true) } @@ -593,10 +604,38 @@ func (l *Loader) Resume(ctx context.Context, pr chan pb.ProcessResult) { return } + err := l.resetDBs() + if err != nil { + pr <- pb.ProcessResult{ + IsCanceled: false, + Errors: []*pb.ProcessError{ + unit.NewProcessError(pb.ErrorType_UnknownError, err), + }, + } + return + } // continue the processing l.Process(ctx, pr) } +func (l *Loader) resetDBs() error { + var err error + + for i := 0; i < len(l.toDBConns); i++ { + err = l.toDBConns[i].resetConn(l.tctx) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + } + + err = l.checkPoint.ResetConn() + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + + return nil +} + // Update implements Unit.Update // now, only support to update config for routes, filters, column-mappings, black-white-list // now no config diff implemented, so simply re-init use new config @@ -855,7 +894,7 @@ func (l *Loader) prepare() error { } // restoreSchema creates schema -func (l *Loader) restoreSchema(conn *Conn, sqlFile, schema string) error { +func (l *Loader) restoreSchema(conn *DBConn, sqlFile, schema string) error { err := l.restoreStructure(conn, sqlFile, schema, "") if err != nil { if isErrDBExists(err) { @@ -868,7 +907,7 @@ func (l *Loader) restoreSchema(conn *Conn, sqlFile, schema string) error { } // restoreTable creates table -func (l *Loader) restoreTable(conn *Conn, sqlFile, schema, table string) error { +func (l *Loader) restoreTable(conn *DBConn, sqlFile, schema, table string) error { err := l.restoreStructure(conn, sqlFile, schema, table) if err != nil { if isErrTableExists(err) { @@ -881,7 +920,7 @@ func (l *Loader) restoreTable(conn *Conn, sqlFile, schema, table string) error { } // restoreStruture creates schema or table -func (l *Loader) restoreStructure(conn *Conn, sqlFile string, schema string, table string) error { +func (l *Loader) restoreStructure(conn *DBConn, sqlFile string, schema string, table string) error { f, err := os.Open(sqlFile) if err != nil { return terror.ErrLoadUnitReadSchemaFile.Delegate(err) @@ -968,11 +1007,18 @@ func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, ta func (l *Loader) restoreData(ctx context.Context) error { begin := time.Now() - conn, err := createConn(l.cfg) + baseConn, err := l.toDB.GetBaseConn(ctx) if err != nil { return err } - defer conn.baseConn.Close() + defer l.toDB.CloseBaseConn(baseConn) + dbConn := &DBConn{ + cfg: l.cfg, + baseConn: baseConn, + resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { + return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData") + }, + } dispatchMap := make(map[string]*fileJob) @@ -988,7 +1034,7 @@ func (l *Loader) restoreData(ctx context.Context) error { // create db dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db) l.tctx.L().Info("start to create schema", zap.String("schema file", dbFile)) - err = l.restoreSchema(conn, dbFile, db) + err = l.restoreSchema(dbConn, dbFile, db) if err != nil { return err } @@ -1015,7 +1061,7 @@ func (l *Loader) restoreData(ctx context.Context) error { // create table l.tctx.L().Info("start to create table", zap.String("table file", tableFile)) - err := l.restoreTable(conn, tableFile, db, table) + err := l.restoreTable(dbConn, tableFile, db, table) if err != nil { return err } diff --git a/pkg/baseconn/conn.go b/pkg/conn/baseconn.go similarity index 66% rename from pkg/baseconn/conn.go rename to pkg/conn/baseconn.go index 81a3918e4d..d9f9ac0e10 100644 --- a/pkg/baseconn/conn.go +++ b/pkg/conn/baseconn.go @@ -11,61 +11,67 @@ // See the License for the specific language governing permissions and // limitations under the License. -package baseconn +package conn import ( "database/sql" - "go.uber.org/zap" - tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + + "go.uber.org/zap" ) -// BaseConn wraps a connection to DB +// BaseConn is the basic connection we use in dm +// BaseDB -> BaseConn correspond to sql.DB -> sql.Conn +// In our scenario, there are two main reasons why we need BaseConn +// 1. we often need one fixed DB connection to execute sql +// 2. we need own retry policy during execute failed +// So we split a fixed sql.Conn out of sql.DB, and wraps it to BaseConn +// And Similar with sql.Conn, all BaseConn generated from one BaseDB shares this BaseDB to reset +// +// Basic usage: +// For Syncer and Loader Unit, they both have different amount of connections due to config +// Currently we have some types of connections exist +// Syncer: +// Worker Connection: +// DML connection: +// execute some DML on Downstream DB, one unit has `syncer.WorkerCount` worker connections +// DDL Connection: +// execute some DDL on Downstream DB, one unit has one connection +// CheckPoint Connection: +// interact with CheckPoint DB, one unit has one connection +// OnlineDDL connection: +// interact with Online DDL DB, one unit has one connection +// ShardGroupKeeper connection: +// interact with ShardGroupKeeper DB, one unit has one connection +// +// Loader: +// Worker Connection: +// execute some DML to Downstream DB, one unit has `loader.PoolSize` worker connections +// CheckPoint Connection: +// interact with CheckPoint DB, one unit has one connection +// Restore Connection: +// only use to create schema and table in restoreData, +// it ignore already exists error and it should be removed after use, one unit has one connection +// +// each connection should have ability to retry on some common errors (e.g. tmysql.ErrTiKVServerTimeout) or maybe some specify errors in the future +// and each connection also should have ability to reset itself during some specify connection error (e.g. driver.ErrBadConn) type BaseConn struct { - DB *sql.DB - - // for reset - DSN string + DBConn *sql.Conn RetryStrategy retry.Strategy - - RawDBCfg *RawDBConfig -} - -// RawDBConfig contains some low level database config -type RawDBConfig struct { - MaxIdleConns int -} - -// DefaultRawDBConfig returns a default raw database config -func DefaultRawDBConfig() *RawDBConfig { - return &RawDBConfig{ - MaxIdleConns: 2, - } } // NewBaseConn builds BaseConn to connect real DB -func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) { - db, err := sql.Open("mysql", dbDSN) - if err != nil { - return nil, terror.ErrDBDriverError.Delegate(err) - } - // set max idle connection limit before any database call - db.SetMaxIdleConns(rawDBCfg.MaxIdleConns) - err = db.Ping() - if err != nil { - db.Close() - return nil, terror.ErrDBDriverError.Delegate(err) - } +func NewBaseConn(conn *sql.Conn, strategy retry.Strategy) *BaseConn { if strategy == nil { strategy = &retry.FiniteRetryStrategy{} } - return &BaseConn{db, dbDSN, strategy, rawDBCfg}, nil + return &BaseConn{conn, strategy} } // SetRetryStrategy set retry strategy for baseConn @@ -77,42 +83,16 @@ func (conn *BaseConn) SetRetryStrategy(strategy retry.Strategy) error { return nil } -// ResetConn generates new *DB with new connection pool to take place old one -func (conn *BaseConn) ResetConn(tctx *tcontext.Context) error { - if conn == nil { - return terror.ErrDBUnExpect.Generate("database connection not valid") - } - db, err := sql.Open("mysql", conn.DSN) - if err != nil { - return terror.ErrDBDriverError.Delegate(err) - } - // set max idle connection limit before any database call - db.SetMaxIdleConns(conn.RawDBCfg.MaxIdleConns) - err = db.Ping() - if err != nil { - db.Close() - return terror.ErrDBDriverError.Delegate(err) - } - if conn.DB != nil { - err := conn.DB.Close() - if err != nil { - tctx.L().Warn("reset connection", log.ShortError(err)) - } - } - conn.DB = db - return nil -} - // QuerySQL defines query statement, and connect to real DB func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) { - if conn == nil || conn.DB == nil { + if conn == nil || conn.DBConn == nil { return nil, terror.ErrDBUnExpect.Generate("database connection not valid") } tctx.L().Debug("query statement", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1))) - rows, err := conn.DB.QueryContext(tctx.Context(), query, args...) + rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...) if err != nil { tctx.L().Error("query statement failed", @@ -132,11 +112,11 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr if len(queries) == 0 { return 0, nil } - if conn == nil || conn.DB == nil { + if conn == nil || conn.DBConn == nil { return 0, terror.ErrDBUnExpect.Generate("database connection not valid") } - txn, err := conn.DB.Begin() + txn, err := conn.DBConn.BeginTx(tctx.Context(), nil) if err != nil { return 0, terror.ErrDBExecuteFailed.Delegate(err, "begin") @@ -200,10 +180,9 @@ func (conn *BaseConn) ApplyRetryStrategy(tctx *tcontext.Context, params retry.Pa return conn.RetryStrategy.Apply(tctx, params, operateFn) } -// Close release DB resource -func (conn *BaseConn) Close() error { - if conn == nil || conn.DB == nil { +func (conn *BaseConn) close() error { + if conn == nil || conn.DBConn == nil { return nil } - return terror.ErrDBUnExpect.Delegate(conn.DB.Close(), "close") + return terror.ErrDBUnExpect.Delegate(conn.DBConn.Close(), "close") } diff --git a/pkg/baseconn/conn_test.go b/pkg/conn/baseconn_test.go similarity index 87% rename from pkg/baseconn/conn_test.go rename to pkg/conn/baseconn_test.go index 4863f592e8..dce1cbc948 100644 --- a/pkg/baseconn/conn_test.go +++ b/pkg/conn/baseconn_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package baseconn +package conn import ( "errors" @@ -35,15 +35,11 @@ type testBaseConnSuite struct { } func (t *testBaseConnSuite) TestBaseConn(c *C) { - baseConn, err := NewBaseConn("error dsn", nil, DefaultRawDBConfig()) - c.Assert(terror.ErrDBDriverError.Equal(err), IsTrue) + baseConn := NewBaseConn(nil, nil) tctx := tcontext.Background() - err = baseConn.ResetConn(tctx) - c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue) - - err = baseConn.SetRetryStrategy(nil) - c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue) + err := baseConn.SetRetryStrategy(nil) + c.Assert(err, IsNil) _, err = baseConn.QuerySQL(tctx, "select 1") c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue) @@ -53,7 +49,10 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - baseConn = &BaseConn{db, "", nil, DefaultRawDBConfig()} + dbConn, err := db.Conn(tctx.Context()) + c.Assert(err, IsNil) + + baseConn = &BaseConn{dbConn, nil} err = baseConn.SetRetryStrategy(&retry.FiniteRetryStrategy{}) c.Assert(err, IsNil) diff --git a/pkg/conn/basedb.go b/pkg/conn/basedb.go new file mode 100644 index 0000000000..b48c97b0b3 --- /dev/null +++ b/pkg/conn/basedb.go @@ -0,0 +1,128 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package conn + +import ( + "context" + "database/sql" + "fmt" + "sync" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/retry" + "github.com/pingcap/dm/pkg/terror" +) + +// DBProvider providers BaseDB instance +type DBProvider interface { + Apply(config config.DBConfig) (*BaseDB, error) +} + +type defaultDBProvider struct { +} + +// DefaultDBProvider is global instance of DBProvider +var DefaultDBProvider DBProvider + +func init() { + DefaultDBProvider = &defaultDBProvider{} +} + +// Apply will build BaseDB with DBConfig +func (d *defaultDBProvider) Apply(config config.DBConfig) (*BaseDB, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d", + config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket) + + var maxIdleConns int + rawCfg := config.RawDBCfg + if rawCfg != nil { + if rawCfg.ReadTimeout != "" { + dsn += fmt.Sprintf("&readTimeout=%s", rawCfg.ReadTimeout) + } + if rawCfg.WriteTimeout != "" { + dsn += fmt.Sprintf("&writeTimeout=%s", rawCfg.WriteTimeout) + } + maxIdleConns = rawCfg.MaxIdleConns + } + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + db.SetMaxIdleConns(maxIdleConns) + + return NewBaseDB(db), nil +} + +// BaseDB wraps *sql.DB, control the BaseConn +type BaseDB struct { + DB *sql.DB + + mu sync.Mutex // protects following fields + // hold all db connections generated from this BaseDB + conns map[*BaseConn]struct{} + + Retry retry.Strategy +} + +// NewBaseDB returns *BaseDB object +func NewBaseDB(db *sql.DB) *BaseDB { + conns := make(map[*BaseConn]struct{}) + return &BaseDB{DB: db, conns: conns, Retry: &retry.FiniteRetryStrategy{}} +} + +// GetBaseConn retrieves *BaseConn which has own retryStrategy +func (d *BaseDB) GetBaseConn(ctx context.Context) (*BaseConn, error) { + conn, err := d.DB.Conn(ctx) + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + err = conn.PingContext(ctx) + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + baseConn := NewBaseConn(conn, d.Retry) + d.mu.Lock() + defer d.mu.Unlock() + d.conns[baseConn] = struct{}{} + return baseConn, nil +} + +// CloseBaseConn release BaseConn resource from BaseDB, and close BaseConn +func (d *BaseDB) CloseBaseConn(conn *BaseConn) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.conns, conn) + return conn.close() +} + +// Close release *BaseDB resource +func (d *BaseDB) Close() error { + if d == nil || d.DB == nil { + return nil + } + var err error + d.mu.Lock() + defer d.mu.Unlock() + for conn := range d.conns { + terr := conn.close() + if err == nil { + err = terr + } + } + terr := d.DB.Close() + if err == nil { + return terr + } + return err +} diff --git a/pkg/conn/basedb_test.go b/pkg/conn/basedb_test.go new file mode 100644 index 0000000000..1674609efa --- /dev/null +++ b/pkg/conn/basedb_test.go @@ -0,0 +1,59 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package conn + +import ( + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + + tcontext "github.com/pingcap/dm/pkg/context" +) + +var _ = Suite(&testBaseDBSuite{}) + +type testBaseDBSuite struct { +} + +func (t *testBaseDBSuite) TestGetBaseConn(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + baseDB := NewBaseDB(db) + + tctx := tcontext.Background() + + dbConn, err := baseDB.GetBaseConn(tctx.Context()) + c.Assert(dbConn, NotNil) + c.Assert(err, IsNil) + + mock.ExpectQuery("select 1").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1")) + rows, err := dbConn.QuerySQL(tctx, "select 1") + c.Assert(err, IsNil) + ids := make([]int, 0, 1) + for rows.Next() { + var id int + err = rows.Scan(&id) + c.Assert(err, IsNil) + ids = append(ids, id) + } + c.Assert(ids, HasLen, 1) + c.Assert(ids[0], Equals, 1) + + mock.ExpectBegin() + mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + affected, err := dbConn.ExecuteSQL(tctx, []string{"create database test"}) + c.Assert(err, IsNil) + c.Assert(affected, Equals, 1) +} diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index ce8464652b..e0348968a4 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -14,6 +14,7 @@ package retry import ( + "database/sql/driver" "strings" "github.com/go-sql-driver/mysql" @@ -62,6 +63,16 @@ func IsRetryableError(err error) bool { return false } +// IsConnectionError tells whether this error should reconnect to Database +func IsConnectionError(err error) bool { + err = errors.Cause(err) + switch err { + case driver.ErrBadConn: + return true + } + return false +} + // IsRetryableErrorFastFailFilter tells whether this error should retry, // filtering some incompatible DDL error to achieve fast fail. func IsRetryableErrorFastFailFilter(err error) bool { diff --git a/pkg/retry/strategy_test.go b/pkg/retry/strategy_test.go index 9e09e238ff..18ee8b43d2 100644 --- a/pkg/retry/strategy_test.go +++ b/pkg/retry/strategy_test.go @@ -14,13 +14,13 @@ package retry import ( + "database/sql/driver" "testing" "time" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/terror" - "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" ) @@ -72,13 +72,20 @@ func (t *testStrategySuite) TestFiniteRetryStrategy(c *C) { return IsRetryableError(err) } operateFn = func(*tcontext.Context) (interface{}, error) { - mysqlErr := mysql.ErrInvalidConn + mysqlErr := driver.ErrBadConn return nil, terror.ErrDBInvalidConn.Delegate(mysqlErr, "test invalid connection") } _, opCount, err = strategy.Apply(ctx, params, operateFn) c.Assert(opCount, Equals, 0) c.Assert(terror.ErrDBInvalidConn.Equal(err), IsTrue) + params.IsRetryableFn = func(int, error) bool { + return IsConnectionError(err) + } + _, opCount, err = strategy.Apply(ctx, params, operateFn) + c.Assert(opCount, Equals, 3) + c.Assert(terror.ErrDBInvalidConn.Equal(err), IsTrue) + retValue := "success" operateFn = func(*tcontext.Context) (interface{}, error) { return retValue, nil diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 1d19c48c5d..c4a1d7c0f0 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -19,16 +19,17 @@ import ( "sync" "time" - "github.com/pingcap/failpoint" - tmysql "github.com/pingcap/parser/mysql" - "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" - "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + + "github.com/pingcap/failpoint" + tmysql "github.com/pingcap/parser/mysql" + "github.com/siddontang/go-mysql/mysql" + "go.uber.org/zap" ) /* @@ -122,7 +123,7 @@ func (b *binlogPoint) String() string { // because, when restarting to continue the sync, all sharding DDLs must try-sync again type CheckPoint interface { // Init initializes the CheckPoint - Init(conn *Conn) error + Init() error // Close closes the CheckPoint Close() @@ -181,12 +182,14 @@ type CheckPoint interface { // RemoteCheckPoint implements CheckPoint // which using target database to store info // NOTE: now we sync from relay log, so not add GTID support yet +// it's not thread-safe type RemoteCheckPoint struct { sync.RWMutex cfg *config.SubTaskConfig - db *Conn + db *conn.BaseDB + dbConn *DBConn schema string // schema name, set through task config table string // table name, now it's task name id string // checkpoint ID, now it is `source-id` @@ -226,28 +229,27 @@ func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s } // Init implements CheckPoint.Init -func (cp *RemoteCheckPoint) Init(conn *Conn) error { - if conn != nil { - cp.db = conn - } else { - db, err := createConn(cp.cfg, cp.cfg.To, maxCheckPointTimeout) - if err != nil { - return err - } - cp.db = db +func (cp *RemoteCheckPoint) Init() error { + checkPointDB := cp.cfg.To + checkPointDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) + db, dbConns, err := createConns(cp.tctx, cp.cfg, checkPointDB, 1) + if err != nil { + return err } + cp.db = db + cp.dbConn = dbConns[0] return cp.prepare() } // Close implements CheckPoint.Close func (cp *RemoteCheckPoint) Close() { - closeConns(cp.tctx, cp.db) + closeBaseDB(cp.tctx, cp.db) } // ResetConn implements CheckPoint.ResetConn func (cp *RemoteCheckPoint) ResetConn() error { - return cp.db.ResetConn(cp.tctx) + return cp.dbConn.resetConn(cp.tctx) } // Clear implements CheckPoint.Clear @@ -258,7 +260,7 @@ func (cp *RemoteCheckPoint) Clear() error { // delete all checkpoints sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id) args := make([]interface{}, 0) - _, err := cp.db.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) if err != nil { return err } @@ -317,7 +319,7 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) e // delete checkpoint sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s' AND `cp_schema` = '%s' AND `cp_table` = '%s'", cp.schema, cp.table, cp.id, sourceSchema, sourceTable) args := make([]interface{}, 0) - _, err := cp.db.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) if err != nil { return err } @@ -403,7 +405,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string, extraSQLs args = append(args, extraArgs[i]) } - _, err := cp.db.executeSQL(cp.tctx, sqls, args...) + _, err := cp.dbConn.executeSQL(cp.tctx, sqls, args...) if err != nil { return err } @@ -466,7 +468,7 @@ func (cp *RemoteCheckPoint) prepare() error { func (cp *RemoteCheckPoint) createSchema() error { sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema) args := make([]interface{}, 0) - _, err := cp.db.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) cp.tctx.L().Info("create checkpoint schema", zap.String("statement", sql2)) return err } @@ -485,7 +487,7 @@ func (cp *RemoteCheckPoint) createTable() error { UNIQUE KEY uk_id_schema_table (id, cp_schema, cp_table) )`, tableName) args := make([]interface{}, 0) - _, err := cp.db.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) + _, err := cp.dbConn.executeSQL(cp.tctx, []string{sql2}, [][]interface{}{args}...) cp.tctx.L().Info("create checkpoint table", zap.String("statement", sql2)) return err } @@ -493,7 +495,12 @@ func (cp *RemoteCheckPoint) createTable() error { // Load implements CheckPoint.Load func (cp *RemoteCheckPoint) Load() error { query := fmt.Sprintf("SELECT `cp_schema`, `cp_table`, `binlog_name`, `binlog_pos`, `is_global` FROM `%s`.`%s` WHERE `id`='%s'", cp.schema, cp.table, cp.id) - rows, err := cp.db.querySQL(cp.tctx, query) + rows, err := cp.dbConn.querySQL(cp.tctx, query) + defer func() { + if rows != nil { + rows.Close() + } + }() failpoint.Inject("LoadCheckpointFailed", func(val failpoint.Value) { err = tmysql.NewErr(uint16(val.(int))) @@ -503,7 +510,6 @@ func (cp *RemoteCheckPoint) Load() error { if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } - defer rows.Close() // checkpoints in DB have higher priority // if don't want to use checkpoint in DB, set `remove-previous-checkpoint` to `true` diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 3463968706..58228dae20 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -21,7 +21,7 @@ import ( "strings" "github.com/pingcap/dm/dm/config" - "github.com/pingcap/dm/pkg/baseconn" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/retry" @@ -90,9 +90,12 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) { mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() - // pass sqlmock baseConn directly - conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} - err = cp.Init(conn) + dbConn, err := db.Conn(tcontext.Background().Context()) + c.Assert(err, IsNil) + conn := &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + + cp.(*RemoteCheckPoint).dbConn = conn + err = cp.(*RemoteCheckPoint).prepare() c.Assert(err, IsNil) cp.Clear() diff --git a/syncer/db.go b/syncer/db.go index 61d2973d58..85ca95034c 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -20,7 +20,7 @@ import ( "time" "github.com/pingcap/dm/dm/config" - "github.com/pingcap/dm/pkg/baseconn" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -71,22 +71,34 @@ type binlogSize struct { size int64 } -// Conn represents a live DB connection -type Conn struct { - cfg *config.SubTaskConfig - baseConn *baseconn.BaseConn +func createBaseDB(dbCfg config.DBConfig) (*conn.BaseDB, error) { + db, err := conn.DefaultDBProvider.Apply(dbCfg) + if err != nil { + return nil, terror.WithScope(err, terror.ScopeDownstream) + } + return db, nil } -// ResetConn reset baseConn.*DB's connection pool -func (conn *Conn) ResetConn(tctx *tcontext.Context) error { - if conn.baseConn == nil { - return terror.ErrDBUnExpect.Generate("database base connection not valid") +// close baseDB to release all connection generated by this baseDB and this baseDB +func closeBaseDB(tctx *tcontext.Context, baseDB *conn.BaseDB) { + if baseDB != nil { + err := baseDB.Close() + if err != nil { + tctx.L().Error("fail to close baseDB", log.ShortError(err)) + } } - return conn.baseConn.ResetConn(tctx) } -func (conn *Conn) getMasterStatus(flavor string) (mysql.Position, gtid.Set, error) { - pos, gtidSet, err := utils.GetMasterStatus(conn.baseConn.DB, flavor) +// UpStreamConn connect to upstream DB +// Normally, we need to get some upstream information through some helper functions +// these helper functions are all easy query functions, so we use a pool of connections here +// maybe change to one connection some day +type UpStreamConn struct { + BaseDB *conn.BaseDB +} + +func (conn *UpStreamConn) getMasterStatus(flavor string) (mysql.Position, gtid.Set, error) { + pos, gtidSet, err := utils.GetMasterStatus(conn.BaseDB.DB, flavor) failpoint.Inject("GetMasterStatusFailed", func(val failpoint.Value) { err = tmysql.NewErr(uint16(val.(int))) @@ -96,27 +108,61 @@ func (conn *Conn) getMasterStatus(flavor string) (mysql.Position, gtid.Set, erro return pos, gtidSet, err } -func (conn *Conn) getServerUUID(flavor string) (string, error) { - return utils.GetServerUUID(conn.baseConn.DB, flavor) +func (conn *UpStreamConn) getServerUUID(flavor string) (string, error) { + return utils.GetServerUUID(conn.BaseDB.DB, flavor) } -func (conn *Conn) getParser(ansiQuotesMode bool) (*parser.Parser, error) { - return utils.GetParser(conn.baseConn.DB, ansiQuotesMode) +func (conn *UpStreamConn) getParser(ansiQuotesMode bool) (*parser.Parser, error) { + return utils.GetParser(conn.BaseDB.DB, ansiQuotesMode) } -func (conn *Conn) killConn(connID uint32) error { - return utils.KillConn(conn.baseConn.DB, connID) +func (conn *UpStreamConn) killConn(connID uint32) error { + return utils.KillConn(conn.BaseDB.DB, connID) } -func (conn *Conn) fetchAllDoTables(bw *filter.Filter) (map[string][]string, error) { - return utils.FetchAllDoTables(conn.baseConn.DB, bw) +func (conn *UpStreamConn) fetchAllDoTables(bw *filter.Filter) (map[string][]string, error) { + return utils.FetchAllDoTables(conn.BaseDB.DB, bw) } -func (conn *Conn) countBinaryLogsSize(pos mysql.Position) (int64, error) { - return countBinaryLogsSize(pos, conn.baseConn.DB) +func (conn *UpStreamConn) countBinaryLogsSize(pos mysql.Position) (int64, error) { + return countBinaryLogsSize(pos, conn.BaseDB.DB) } -func (conn *Conn) querySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) { +func createUpStreamConn(dbCfg config.DBConfig) (*UpStreamConn, error) { + baseDB, err := createBaseDB(dbCfg) + if err != nil { + return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream) + } + return &UpStreamConn{BaseDB: baseDB}, nil +} + +func closeUpstreamConn(tctx *tcontext.Context, conn *UpStreamConn) { + if conn != nil { + closeBaseDB(tctx, conn.BaseDB) + } +} + +// DBConn represents a live DB connection +// it's not thread-safe +type DBConn struct { + cfg *config.SubTaskConfig + baseConn *conn.BaseConn + + // generate new BaseConn and close old one + resetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) +} + +// resetConn reset one worker connection from specify *BaseDB +func (conn *DBConn) resetConn(tctx *tcontext.Context) error { + baseConn, err := conn.resetBaseConnFn(tctx, conn.baseConn) + if err != nil { + return err + } + conn.baseConn = baseConn + return nil +} + +func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) { if conn == nil || conn.baseConn == nil { return nil, terror.ErrDBUnExpect.Generate("database base connection not valid") } @@ -125,6 +171,18 @@ func (conn *Conn) querySQL(tctx *tcontext.Context, query string, args ...interfa FirstRetryDuration: retryTimeout, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { + if retry.IsConnectionError(err) { + err = conn.resetConn(tctx) + if err != nil { + tctx.L().Error("reset connection failed", zap.Int("retry", retryTime), + zap.String("query", utils.TruncateInterface(query, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + return false + } + sqlRetriesTotal.WithLabelValues("query", conn.cfg.Name).Add(1) + return true + } if retry.IsRetryableError(err) { tctx.L().Warn("query statement", zap.Int("retry", retryTime), zap.String("query", utils.TruncateString(query, -1)), @@ -154,7 +212,7 @@ func (conn *Conn) querySQL(tctx *tcontext.Context, query string, args ...interfa return ret.(*sql.Rows), nil } -func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) { +func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) { if len(queries) == 0 { return 0, nil } @@ -168,6 +226,18 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func( FirstRetryDuration: retryTimeout, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { + if retry.IsConnectionError(err) { + err = conn.resetConn(tctx) + if err != nil { + tctx.L().Error("reset connection failed", zap.Int("retry", retryTime), + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + return false + } + sqlRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1) + return true + } if retry.IsRetryableErrorFastFailFilter(err) { tctx.L().Warn("execute statements", zap.Int("retry", retryTime), zap.String("queries", utils.TruncateInterface(queries, -1)), @@ -205,57 +275,35 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func( return ret.(int), nil } -func (conn *Conn) executeSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) { +func (conn *DBConn) executeSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) { return conn.executeSQLWithIgnore(tctx, nil, queries, args...) } -func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.RawDBConfig) (*baseconn.BaseConn, error) { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s&maxAllowedPacket=%d", - dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout, *dbCfg.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg) - if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - return baseConn, nil -} - -func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - baseConn, err := createBaseConn(dbCfg, timeout, baseconn.DefaultRawDBConfig()) - if err != nil { - return nil, err - } - return &Conn{baseConn: baseConn, cfg: cfg}, nil -} - -func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) { - dbs := make([]*Conn, 0, count) - - rawDBCfg := &baseconn.RawDBConfig{ - MaxIdleConns: cfg.SyncerConfig.WorkerCount, - } - baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg) +func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int) (*conn.BaseDB, []*DBConn, error) { + conns := make([]*DBConn, 0, count) + baseDB, err := createBaseDB(dbCfg) if err != nil { - return nil, err + return nil, nil, err } for i := 0; i < count; i++ { - // TODO use *sql.Conn instead of *sql.DB - // share db by all conns - bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy, rawDBCfg} - dbs = append(dbs, &Conn{baseConn: bc, cfg: cfg}) - } - return dbs, nil -} - -func closeConns(tctx *tcontext.Context, conns ...*Conn) { - for _, conn := range conns { - err := conn.baseConn.Close() + baseConn, err := baseDB.GetBaseConn(tctx.Context()) if err != nil { - tctx.L().Error("fail to close baseConn connection", log.ShortError(err)) + closeBaseDB(tctx, baseDB) + return nil, nil, terror.WithScope(err, terror.ScopeDownstream) + } + resetBaseConnFn := func(tctx *tcontext.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) { + err := baseDB.CloseBaseConn(baseConn) + if err != nil { + tctx.L().Warn("failed to close baseConn in reset") + } + return baseDB.GetBaseConn(tctx.Context()) } + conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn}) } + return baseDB, conns, nil } -func getTableIndex(tctx *tcontext.Context, conn *Conn, table *table) error { +func getTableIndex(tctx *tcontext.Context, conn *DBConn, table *table) error { if table.schema == "" || table.name == "" { return terror.ErrDBUnExpect.Generate("schema/table is empty") } @@ -312,7 +360,7 @@ func getTableIndex(tctx *tcontext.Context, conn *Conn, table *table) error { return nil } -func getTableColumns(tctx *tcontext.Context, conn *Conn, table *table) error { +func getTableColumns(tctx *tcontext.Context, conn *DBConn, table *table) error { if table.schema == "" || table.name == "" { return terror.ErrDBUnExpect.Generate("schema/table is empty") } diff --git a/syncer/db_test.go b/syncer/db_test.go index c11ce86e8b..6e6ab7e042 100644 --- a/syncer/db_test.go +++ b/syncer/db_test.go @@ -29,7 +29,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/dm/dm/config" - "github.com/pingcap/dm/pkg/baseconn" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" @@ -145,9 +145,12 @@ func (s *testDBSuite) TestBinaryLogs(c *C) { func (s *testSyncerSuite) TestExecuteSQLSWithIgnore(c *C) { db, mock, err := sqlmock.New() - conn := &Conn{ - baseConn: &baseconn.BaseConn{ - DB: db, + c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + conn := &DBConn{ + baseConn: &conn.BaseConn{ + DBConn: dbConn, RetryStrategy: &retry.FiniteRetryStrategy{}, }, cfg: &config.SubTaskConfig{ @@ -238,7 +241,7 @@ func (s *testDBSuite) TestTimezone(c *C) { s.resetBinlogSyncer(c) // we should not use `sql.DB.Exec` to do query which depends on session variables - // because `sql.DB.Exec` will choose a underlying Conn for every query from the connection pool + // because `sql.DB.Exec` will choose a underlying DBConn for every query from the connection pool // and different Conn using different session // ref: `sql.DB.Conn` // and `set @@global` is also not reasonable, because it can not affect sessions already exist diff --git a/syncer/ghost.go b/syncer/ghost.go index fbe0a87e89..9dcd26d4ce 100644 --- a/syncer/ghost.go +++ b/syncer/ghost.go @@ -175,3 +175,8 @@ func (g *Ghost) Clear() error { func (g *Ghost) Close() { g.storge.Close() } + +// ResetConn implements interface +func (g *Ghost) ResetConn() error { + return g.storge.ResetConn() +} diff --git a/syncer/online_ddl.go b/syncer/online_ddl.go index c117ff9612..c5f93c4bf2 100644 --- a/syncer/online_ddl.go +++ b/syncer/online_ddl.go @@ -20,12 +20,13 @@ import ( "strings" "sync" - "github.com/pingcap/parser/ast" - "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/terror" + + "github.com/pingcap/parser/ast" + "github.com/pingcap/tidb-tools/pkg/filter" ) var ( @@ -50,6 +51,8 @@ type OnlinePlugin interface { TableType(table string) TableType // RealName returns real table name that removed ghost suffix and handled by table router RealName(schema, table string) (string, string) + // ResetConn reset db connection + ResetConn() error // Clear clears all online information Clear() error // Close closes online ddl plugin @@ -79,7 +82,8 @@ type OnlineDDLStorage struct { cfg *config.SubTaskConfig - db *Conn + db *conn.BaseDB + dbConn *DBConn schema string // schema name, set through task config table string // table name, now it's task name id string // now it is `server-id` used as MySQL slave @@ -106,11 +110,14 @@ func NewOnlineDDLStorage(newtctx *tcontext.Context, cfg *config.SubTaskConfig) * // Init initials online handler func (s *OnlineDDLStorage) Init() error { - db, err := createConn(s.cfg, s.cfg.To, maxCheckPointTimeout) + onlineDB := s.cfg.To + onlineDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) + db, dbConns, err := createConns(s.tctx, s.cfg, onlineDB, 1) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } s.db = db + s.dbConn = dbConns[0] err = s.prepare() if err != nil { @@ -126,7 +133,7 @@ func (s *OnlineDDLStorage) Load() error { defer s.Unlock() query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM `%s`.`%s` WHERE `id`='%s'", s.schema, s.table, s.id) - rows, err := s.db.querySQL(s.tctx, query) + rows, err := s.dbConn.querySQL(s.tctx, query) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -204,7 +211,7 @@ func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, } query := fmt.Sprintf("REPLACE INTO `%s`.`%s`(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES ('%s', '%s', '%s', '%s')", s.schema, s.table, s.id, ghostSchema, ghostTable, escapeSingleQuote(string(ddlsBytes))) - _, err = s.db.executeSQL(s.tctx, []string{query}) + _, err = s.dbConn.executeSQL(s.tctx, []string{query}) return terror.WithScope(err, terror.ScopeDownstream) } @@ -220,7 +227,7 @@ func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error { // delete all checkpoints sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s' and `ghost_schema` = '%s' and `ghost_table` = '%s'", s.schema, s.table, s.id, ghostSchema, ghostTable) - _, err := s.db.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -236,7 +243,7 @@ func (s *OnlineDDLStorage) Clear() error { // delete all checkpoints sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", s.schema, s.table, s.id) - _, err := s.db.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -245,12 +252,17 @@ func (s *OnlineDDLStorage) Clear() error { return nil } +// ResetConn implements OnlinePlugin.ResetConn +func (s *OnlineDDLStorage) ResetConn() error { + return s.dbConn.resetConn(s.tctx) +} + // Close closes database connection func (s *OnlineDDLStorage) Close() { s.Lock() defer s.Unlock() - closeConns(s.tctx, s.db) + closeBaseDB(s.tctx, s.db) } func (s *OnlineDDLStorage) prepare() error { @@ -266,7 +278,7 @@ func (s *OnlineDDLStorage) prepare() error { func (s *OnlineDDLStorage) createSchema() error { sql := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.schema) - _, err := s.db.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } @@ -280,7 +292,7 @@ func (s *OnlineDDLStorage) createTable() error { update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_id_schema_table (id, ghost_schema, ghost_table) )`, tableName) - _, err := s.db.executeSQL(s.tctx, []string{sql}) + _, err := s.dbConn.executeSQL(s.tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } diff --git a/syncer/pt_osc.go b/syncer/pt_osc.go index 3837e5da20..ba681aa1fc 100644 --- a/syncer/pt_osc.go +++ b/syncer/pt_osc.go @@ -175,3 +175,8 @@ func (p *PT) Clear() error { func (p *PT) Close() { p.storge.Close() } + +// ResetConn implements interface +func (p *PT) ResetConn() error { + return p.storge.ResetConn() +} diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index cf224a873c..0f4f1281a7 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -75,14 +75,15 @@ import ( "strings" "sync" - "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" - "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/terror" shardmeta "github.com/pingcap/dm/syncer/sharding-meta" + + "github.com/siddontang/go-mysql/mysql" + "go.uber.org/zap" ) // ShardingGroup represents a sharding DDL sync group @@ -401,7 +402,9 @@ type ShardingGroupKeeper struct { shardMetaSchema string shardMetaTable string - db *Conn + + db *conn.BaseDB + dbConn *DBConn tctx *tcontext.Context } @@ -451,19 +454,17 @@ func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceI } // Init does initialization staff -func (k *ShardingGroupKeeper) Init(conn *Conn) error { +func (k *ShardingGroupKeeper) Init() error { k.clear() - if conn != nil { - k.db = conn - } else { - db, err := createConn(k.cfg, k.cfg.To, maxDDLConnectionTimeout) - if err != nil { - return err - } - k.db = db + sgkDB := k.cfg.To + sgkDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) + db, dbConns, err := createConns(k.tctx, k.cfg, sgkDB, 1) + if err != nil { + return err } - err := k.prepare() - return err + k.db = db + k.dbConn = dbConns[0] + return k.prepare() } // clear clears all sharding groups @@ -697,12 +698,12 @@ func (k *ShardingGroupKeeper) prepare() error { // Close closes sharding group keeper func (k *ShardingGroupKeeper) Close() { - closeConns(k.tctx, k.db) + closeBaseDB(k.tctx, k.db) } func (k *ShardingGroupKeeper) createSchema() error { stmt := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", k.shardMetaSchema) - _, err := k.db.executeSQL(k.tctx, []string{stmt}) + _, err := k.dbConn.executeSQL(k.tctx, []string{stmt}) k.tctx.L().Info("execute sql", zap.String("statement", stmt)) return terror.WithScope(err, terror.ScopeDownstream) } @@ -720,7 +721,7 @@ func (k *ShardingGroupKeeper) createTable() error { update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_source_id_table_id_source (source_id, target_table_id, source_table_id) )`, tableName) - _, err := k.db.executeSQL(k.tctx, []string{stmt}) + _, err := k.dbConn.executeSQL(k.tctx, []string{stmt}) k.tctx.L().Info("execute sql", zap.String("statement", stmt)) return terror.WithScope(err, terror.ScopeDownstream) } @@ -728,7 +729,7 @@ func (k *ShardingGroupKeeper) createTable() error { // LoadShardMeta implements CheckPoint.LoadShardMeta func (k *ShardingGroupKeeper) LoadShardMeta() (map[string]*shardmeta.ShardingMeta, error) { query := fmt.Sprintf("SELECT `target_table_id`, `source_table_id`, `active_index`, `is_global`, `data` FROM `%s`.`%s` WHERE `source_id`='%s'", k.shardMetaSchema, k.shardMetaTable, k.cfg.SourceID) - rows, err := k.db.querySQL(k.tctx, query) + rows, err := k.dbConn.querySQL(k.tctx, query) if err != nil { return nil, terror.WithScope(err, terror.ScopeDownstream) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 02a6cfcbe2..1ac8438a31 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" @@ -145,9 +146,12 @@ type Syncer struct { cacheColumns map[string][]string // table columns cache: `target-schema`.`target-table` -> column names list genColsCache *GenColCache - fromDB *Conn - toDBs []*Conn - ddlDB *Conn + fromDB *UpStreamConn + + toDB *conn.BaseDB + toDBConns []*DBConn + ddlDB *conn.BaseDB + ddlDBConn *DBConn jobs []chan *job jobsClosed sync2.AtomicBool @@ -338,14 +342,19 @@ func (s *Syncer) Init() (err error) { } if s.cfg.IsSharding { - err = s.initShardingGroups(nil) + err = s.sgk.Init() + if err != nil { + return err + } + + err = s.initShardingGroups() if err != nil { return err } rollbackHolder.Add(fr.FuncRollback{Name: "close-sharding-group-keeper", Fn: s.sgk.Close}) } - err = s.checkpoint.Init(nil) + err = s.checkpoint.Init() if err != nil { return err } @@ -400,19 +409,13 @@ func (s *Syncer) Init() (err error) { // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules // NOTE: now we don't support modify router rules after task has started -func (s *Syncer) initShardingGroups(conn *Conn) error { +func (s *Syncer) initShardingGroups() error { // fetch tables from source and filter them sourceTables, err := s.fromDB.fetchAllDoTables(s.bwList) if err != nil { return err } - // clear old sharding group and initials some needed data - err = s.sgk.Init(conn) - if err != nil { - return err - } - // convert according to router rules // target-schema -> target-table -> source-IDs mapper := make(map[string]map[string][]string, len(sourceTables)) @@ -505,19 +508,28 @@ func (s *Syncer) reset() { func (s *Syncer) resetDBs() error { var err error - // toDBs share the same `*sql.DB` in underlying `*baseconn.BaseConn`, currently the `BaseConn.ResetConn` - // can only reset the `*sql.DB` and point to the new `*sql.DB`, it is hard to reset all the `*sql.DB` by - // calling `BaseConn.ResetConn` once. On the other side if we simply change the underlying value of a - // `*sql.DB` by `*conn.DB = *db`, there exists some data race and invalid memory address in db driver. - // So we use the close and recreate way here. - closeConns(s.tctx, s.toDBs...) - s.toDBs, err = createConns(s.cfg, s.cfg.To, s.cfg.WorkerCount, maxDMLConnectionTimeout) - if err != nil { - return terror.WithScope(err, terror.ScopeDownstream) + for i := 0; i < len(s.toDBConns); i++ { + err = s.toDBConns[i].resetConn(s.tctx) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } } - s.tctx.L().Info("createDBs", zap.String("toDBs baseConn", fmt.Sprintf("%p", s.toDBs[0].baseConn.DB))) - err = s.ddlDB.ResetConn(s.tctx) + if s.onlineDDL != nil { + err = s.onlineDDL.ResetConn() + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + } + + if s.sgk != nil { + err = s.sgk.dbConn.resetConn(s.tctx) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + } + + err = s.ddlDBConn.resetConn(s.tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -635,7 +647,7 @@ func (s *Syncer) clearAllTables() { s.genColsCache.reset() } -func (s *Syncer) getTableFromDB(db *Conn, schema string, name string) (*table, error) { +func (s *Syncer) getTableFromDB(db *DBConn, schema string, name string) (*table, error) { table := &table{} table.schema = schema table.name = name @@ -666,8 +678,7 @@ func (s *Syncer) getTable(schema string, table string) (*table, []string, error) return value, s.cacheColumns[key], nil } - db := s.toDBs[len(s.toDBs)-1] - t, err := s.getTableFromDB(db, schema, table) + t, err := s.getTableFromDB(s.ddlDBConn, schema, table) if err != nil { return nil, nil, err } @@ -855,7 +866,7 @@ func (s *Syncer) flushCheckPoints() error { } // DDL synced one by one, so we only need to process one DDL at a time -func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *Conn, ddlJobChan chan *job) { +func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, ddlJobChan chan *job) { defer s.wg.Done() var err error @@ -910,7 +921,7 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *Conn, dd } } -func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *Conn, jobChan chan *job) { +func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { defer s.wg.Done() idx := 0 @@ -1058,7 +1069,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func(i int, n string) { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.sync(ctctx, n, s.toDBs[i], s.jobs[i]) + s.sync(ctctx, n, s.toDBConns[i], s.jobs[i]) cancel() }(i, name) } @@ -1068,7 +1079,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func() { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.syncDDL(ctctx, adminQueueName, s.ddlDB, s.jobs[s.cfg.WorkerCount]) + s.syncDDL(ctctx, adminQueueName, s.ddlDBConn, s.jobs[s.cfg.WorkerCount]) cancel() }() @@ -2024,33 +2035,44 @@ func (s *Syncer) printStatus(ctx context.Context) { func (s *Syncer) createDBs() error { var err error - s.fromDB, err = createConn(s.cfg, s.cfg.From, maxDMLConnectionTimeout) + dbCfg := s.cfg.From + dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout) + s.fromDB, err = createUpStreamConn(dbCfg) if err != nil { - return terror.WithScope(err, terror.ScopeUpstream) + return err } - s.toDBs = make([]*Conn, 0, s.cfg.WorkerCount) - s.toDBs, err = createConns(s.cfg, s.cfg.To, s.cfg.WorkerCount, maxDMLConnectionTimeout) + dbCfg = s.cfg.To + dbCfg.RawDBCfg = config.DefaultRawDBConfig(). + SetReadTimeout(maxDMLConnectionTimeout). + SetMaxIdleConns(s.cfg.WorkerCount) + + s.toDB, s.toDBConns, err = createConns(s.tctx, s.cfg, dbCfg, s.cfg.WorkerCount) if err != nil { - closeConns(s.tctx, s.fromDB) // release resources acquired before return with error - return terror.WithScope(err, terror.ScopeDownstream) + closeUpstreamConn(s.tctx, s.fromDB) // release resources acquired before return with error + return err } // baseConn for ddl - s.ddlDB, err = createConn(s.cfg, s.cfg.To, maxDDLConnectionTimeout) + dbCfg = s.cfg.To + dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDDLConnectionTimeout) + + var ddlDBConns []*DBConn + s.ddlDB, ddlDBConns, err = createConns(s.tctx, s.cfg, dbCfg, 1) if err != nil { - closeConns(s.tctx, s.fromDB) - closeConns(s.tctx, s.toDBs...) - return terror.WithScope(err, terror.ScopeDownstream) + closeUpstreamConn(s.tctx, s.fromDB) + closeBaseDB(s.tctx, s.toDB) + return err } + s.ddlDBConn = ddlDBConns[0] return nil } -// closeConns closes all opened DBs, rollback for createConns +// closeBaseDB closes all opened DBs, rollback for createConns func (s *Syncer) closeDBs() { - closeConns(s.tctx, s.fromDB) - closeConns(s.tctx, s.toDBs...) - closeConns(s.tctx, s.ddlDB) + closeUpstreamConn(s.tctx, s.fromDB) + closeBaseDB(s.tctx, s.toDB) + closeBaseDB(s.tctx, s.ddlDB) } // record skip ddl/dml sqls' position @@ -2317,7 +2339,12 @@ func (s *Syncer) Update(cfg *config.SubTaskConfig) error { if s.cfg.IsSharding { // re-init sharding group - err = s.initShardingGroups(nil) + err = s.sgk.Init() + if err != nil { + return err + } + + err = s.initShardingGroups() if err != nil { return err } @@ -2396,12 +2423,13 @@ func (s *Syncer) ExecuteDDL(ctx context.Context, execReq *pb.ExecDDLRequest) (<- func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error { s.Lock() defer s.Unlock() - s.fromDB.baseConn.Close() + s.fromDB.BaseDB.Close() s.cfg.From = cfg.From var err error - s.fromDB, err = createConn(s.cfg, s.cfg.From, maxDMLConnectionTimeout) + s.cfg.From.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout) + s.fromDB, err = createUpStreamConn(s.cfg.From) if err != nil { s.tctx.L().Error("fail to create baseConn connection", log.ShortError(err)) return err diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 67ef083a5d..2f0210abb1 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -25,8 +25,8 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" - "github.com/pingcap/dm/pkg/baseconn" "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/conn" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" parserpkg "github.com/pingcap/dm/pkg/parser" @@ -934,9 +934,12 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { } syncer := NewSyncer(s.cfg) - // use upstream db as mock downstream - syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} - syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} + // use upstream dbConn as mock downstream + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.ddlDBConn = &DBConn{baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.toDBConns = []*DBConn{{baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.reset() streamer, err := syncer.streamerProducer.generateStreamer(pos) @@ -1215,15 +1218,30 @@ func (s *testSyncerSuite) TestSharding(c *C) { // make syncer write to mock baseConn syncer := NewSyncer(s.cfg) - // fromDB mocks upstream db, db mocks downstream db - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + ctx := context.Background() + // fromDB mocks upstream dbConn, dbConn mocks downstream dbConn + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(fromDB)} + dbConn, err := db.Conn(ctx) + c.Assert(err, IsNil) + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} // mock syncer.Init() function, because we need to pass mock dbs to different members' init syncer.genRouter() - syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) + shardGroupDBConn, err := shardGroupDB.Conn(ctx) + c.Assert(err, IsNil) + checkPointDBConn, err := checkPointDB.Conn(ctx) + c.Assert(err, IsNil) + + // mock syncer.shardGroupkeeper.Init() function + syncer.sgk.dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(shardGroupDBConn, &retry.FiniteRetryStrategy{})} + syncer.sgk.prepare() + syncer.initShardingGroups() + + // mock syncer.checkpoint.Init() function + syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} + syncer.checkpoint.(*RemoteCheckPoint).prepare() + syncer.reset() events := append(createEvents, s.generateEvents(_case.testEvents, c)...) syncer.streamerProducer = &MockStreamProducer{events} @@ -1281,7 +1299,6 @@ func (s *testSyncerSuite) TestSharding(c *C) { resultCh := make(chan pb.ProcessResult) s.mockCheckPointFlush(checkPointMock) - checkPointMock.ExpectClose() go syncer.Process(ctx, resultCh) @@ -1332,7 +1349,10 @@ func (s *testSyncerSuite) TestRun(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) checkPointDB, checkPointMock, err := sqlmock.New() + checkPointDBConn, err := checkPointDB.Conn(context.Background()) c.Assert(err, IsNil) testJobs.jobs = testJobs.jobs[:0] @@ -1362,10 +1382,10 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.DisableCausality = false syncer := NewSyncer(s.cfg) - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}, - {cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, + {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1379,7 +1399,10 @@ func (s *testSyncerSuite) TestRun(c *C) { checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectCommit() - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) + // mock syncer.checkpoint.Init() function + syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} + syncer.checkpoint.(*RemoteCheckPoint).prepare() + syncer.reset() events1 := mockBinlogEvents{ mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}},