Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

checker: increase total timeout to 30m; set readTimeout for DB check operation to 30s (#315) #327

Merged
merged 4 commits into from
Oct 21, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -42,13 +43,21 @@ import (
"go.uber.org/zap"
)

const (
// the total time needed to complete the check depends on the number of instances, databases and tables,
// now increase the total timeout to 30min, but set `readTimeout` to 30s for source/target DB.
// if we can not complete the check in 30min, then we must need to refactor the implementation of the function.
checkTimeout = 30 * time.Minute
readTimeout = "30s"
)

type mysqlInstance struct {
cfg *config.SubTaskConfig

sourceDB *sql.DB
sourceDB *conn.BaseDB
sourceDBinfo *dbutil.DBConfig

targetDB *sql.DB
targetDB *conn.BaseDB
targetDBInfo *dbutil.DBConfig
}

Expand Down Expand Up @@ -125,7 +134,9 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.From.User,
Password: instance.cfg.From.Password,
}
instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo)
dbCfg := instance.cfg.From
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.sourceDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream)
}
Expand All @@ -136,35 +147,37 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.To.User,
Password: instance.cfg.To.Password,
}
instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo)
dbCfg = instance.cfg.To
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.targetDB, err = conn.DefaultDBProvider.Apply(dbCfg)
if err != nil {
return terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream)
}

if _, ok := c.checkingItems[config.VersionChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}

if !checkingShard && !checkSchema {
continue
}

mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r)
mapping, err := utils.FetchTargetDoTables(instance.sourceDB.DB, bw, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -192,10 +205,10 @@ func (c *Checker) Init() (err error) {
shardingCounter[name]++
}
}
dbs[instance.cfg.SourceID] = instance.sourceDB
dbs[instance.cfg.SourceID] = instance.sourceDB.DB

if checkSchema {
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables))
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB.DB, instance.sourceDBinfo, checkTables))
}
}

Expand Down Expand Up @@ -229,7 +242,7 @@ func (c *Checker) displayCheckingItems() string {

// Process implements Unit interface
func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
cctx, cancel := context.WithTimeout(ctx, time.Minute)
cctx, cancel := context.WithTimeout(ctx, checkTimeout)
defer cancel()

isCanceled := false
Expand Down Expand Up @@ -296,14 +309,14 @@ func (c *Checker) Close() {
func (c *Checker) closeDBs() {
for _, instance := range c.instances {
if instance.sourceDB != nil {
if err := dbutil.CloseDB(instance.sourceDB); err != nil {
if err := instance.sourceDB.Close(); err != nil {
c.logger.Error("close source db", zap.Stringer("db", instance.sourceDBinfo), log.ShortError(err))
}
instance.sourceDB = nil
}

if instance.targetDB != nil {
if err := dbutil.CloseDB(instance.targetDB); err != nil {
if err := instance.targetDB.Close(); err != nil {
c.logger.Error("close target db", zap.Stringer("db", instance.targetDBInfo), log.ShortError(err))
}
instance.targetDB = nil
Expand Down