Skip to content

Commit

Permalink
dm: use basedb/baseconn in DM & make sure all db connection error con…
Browse files Browse the repository at this point in the history
…tains correct scope (#7741)

ref #4287
  • Loading branch information
D3Hunter authored Dec 5, 2022
1 parent bfba7a5 commit 5859eb6
Show file tree
Hide file tree
Showing 122 changed files with 1,295 additions and 1,202 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
dmutils "github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
Expand Down
5 changes: 3 additions & 2 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/util/filter"
router "github.com/pingcap/tidb/util/table-router"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/pkg/version"
)
Expand Down Expand Up @@ -255,8 +256,8 @@ type syncerConfig struct {

RouteRules []*RouteRule `toml:"route-rules" json:"route-rules"`

From config.DBConfig `toml:"from" json:"from"`
To config.DBConfig `toml:"to" json:"to"`
From dbconfig.DBConfig `toml:"from" json:"from"`
To dbconfig.DBConfig `toml:"to" json:"to"`

EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
Expand Down
4 changes: 2 additions & 2 deletions dm/chaos/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"path/filepath"

config2 "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/utils"
"golang.org/x/sync/errgroup"
Expand All @@ -33,7 +33,7 @@ var (

// runCases runs test cases.
func runCases(ctx context.Context, cli pb.MasterClient, confDir string,
targetCfg config2.DBConfig, sourcesCfg ...config2.DBConfig,
targetCfg dbconfig.DBConfig, sourcesCfg ...dbconfig.DBConfig,
) error {
eg, ctx2 := errgroup.WithContext(ctx)
for i := range filenames {
Expand Down
10 changes: 5 additions & 5 deletions dm/chaos/cases/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"flag"
"time"

config2 "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
)

// config is used to run chaos tests.
Expand All @@ -31,10 +31,10 @@ type config struct {
MasterCount int `toml:"master-count" yaml:"master-count" json:"master-count"`
WorkerCount int `toml:"worker-count" yaml:"worker-count" json:"worker-count"`

Source1 config2.DBConfig `toml:"source-1" yaml:"source-1" json:"source-1"`
Source2 config2.DBConfig `toml:"source-2" yaml:"source-2" json:"source-2"`
Source3 config2.DBConfig `toml:"source-3" yaml:"source-3" json:"source-3"`
Target config2.DBConfig `toml:"target" yaml:"target" json:"target"`
Source1 dbconfig.DBConfig `toml:"source-1" yaml:"source-1" json:"source-1"`
Source2 dbconfig.DBConfig `toml:"source-2" yaml:"source-2" json:"source-2"`
Source3 dbconfig.DBConfig `toml:"source-3" yaml:"source-3" json:"source-3"`
Target dbconfig.DBConfig `toml:"target" yaml:"target" json:"target"`
}

// newConfig creates a config for this chaos testing suite.
Expand Down
6 changes: 1 addition & 5 deletions dm/chaos/cases/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"go.uber.org/zap"
)

// dbConn holds a connection to a database and supports to reset the connection.
Expand All @@ -48,10 +47,7 @@ func createDBConn(ctx context.Context, db *conn.BaseDB, currDB string) (*dbConn,
baseConn: c,
currDB: currDB,
resetFunc: func(ctx context.Context, baseConn *conn.BaseConn) (*conn.BaseConn, error) {
err2 := db.CloseBaseConn(baseConn)
if err2 != nil {
log.L().Warn("fail to close connection", zap.Error(err2))
}
db.ForceCloseConnWithoutErr(baseConn)
return db.GetBaseConn(ctx)
},
}, nil
Expand Down
8 changes: 4 additions & 4 deletions dm/chaos/cases/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package main
import (
"context"

config2 "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/pkg/conn"
)

Expand All @@ -26,8 +26,8 @@ var mustExecSQLs = []string{
}

// setInstancesState sets the state (like global sql_mode) for upstream and downstream DB instances.
func setInstancesState(ctx context.Context, targetCfg *config2.DBConfig, sourcesCfg ...*config2.DBConfig) error {
targetDB, err := conn.DefaultDBProvider.Apply(targetCfg)
func setInstancesState(ctx context.Context, targetCfg *dbconfig.DBConfig, sourcesCfg ...*dbconfig.DBConfig) error {
targetDB, err := conn.GetDownstreamDB(targetCfg)
if err != nil {
return err
}
Expand All @@ -39,7 +39,7 @@ func setInstancesState(ctx context.Context, targetCfg *config2.DBConfig, sources
}

for _, cfg := range sourcesCfg {
sourceDB, err2 := conn.DefaultDBProvider.Apply(cfg)
sourceDB, err2 := conn.GetUpstreamDB(cfg)
if err2 != nil {
return err2
}
Expand Down
24 changes: 12 additions & 12 deletions dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbutil"
config2 "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -65,7 +65,7 @@ type task struct {

// newTask creates a new task instance.
func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema string,
targetCfg config2.DBConfig, sourcesCfg ...config2.DBConfig,
targetCfg dbconfig.DBConfig, sourcesCfg ...dbconfig.DBConfig,
) (*task, error) {
var taskCfg config2.TaskConfig
err := taskCfg.DecodeFile(taskFile)
Expand All @@ -86,31 +86,31 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
}

cfg := sourcesCfg[i]
db, err2 := conn.DefaultDBProvider.Apply(&cfg)
db, err2 := conn.GetUpstreamDB(&cfg)
if err2 != nil {
return nil, err2
}
conn, err2 := createDBConn(ctx, db, schema)
dbConnection, err2 := createDBConn(ctx, db, schema)
if err2 != nil {
return nil, err2
}
if taskCfg.CaseSensitive {
lcSetting, err2 := utils.FetchLowerCaseTableNamesSetting(ctx, conn.baseConn.DBConn)
lcSetting, err2 := conn.FetchLowerCaseTableNamesSetting(ctx, dbConnection.baseConn)
if err2 != nil {
return nil, err2
}
if lcSetting == utils.LCTableNamesMixed {
if lcSetting == conn.LCTableNamesMixed {
msg := "can not set `case-sensitive = true` when upstream `lower_case_table_names = 2`"
log.L().Error(msg, zap.Any("instance", cfg))
return nil, errors.New(msg)
}
}
sourceDBs = append(sourceDBs, db)
sourceConns = append(sourceConns, conn)
sourceConns = append(sourceConns, dbConnection)
res = append(res, singleResult{})
}

targetDB, err := conn.DefaultDBProvider.Apply(&targetCfg)
targetDB, err := conn.GetDownstreamDB(&targetCfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -436,12 +436,12 @@ func (t *task) genIncrData(pCtx context.Context) (err error) {
// NOTE: no re-order inject even for optimistic shard DDL now.

var eg errgroup.Group
for i, conn := range t.sourceConns {
conn2 := conn
for i, c := range t.sourceConns {
conn2 := c
i2 := i
eg.Go(func() error {
if err2 := conn2.execDDLs(t.ctx, query); err2 != nil {
if utils.IsMySQLError(err2, mysql.ErrDupFieldName) {
if conn.IsMySQLError(err2, mysql.ErrDupFieldName) {
t.logger.Warn("ignore duplicate field name for ddl", log.ShortError(err))
return nil
}
Expand Down Expand Up @@ -501,7 +501,7 @@ func (t *task) diffIncrData(ctx context.Context) (err error) {
}

func (t *task) updateSchema() error {
ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
ctx, cancel := context.WithTimeout(context.Background(), conn.DefaultDBTimeout)
defer cancel()

for i, db := range t.sourceDBs {
Expand Down
25 changes: 12 additions & 13 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package checker
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand All @@ -36,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/filter"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/loader"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/binlog"
Expand All @@ -47,7 +47,6 @@ import (
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
"go.uber.org/atomic"
Expand Down Expand Up @@ -179,9 +178,9 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
eg.Go(func() error {
for _, sourceTables := range tableMapPerUpstream[i] {
for _, sourceTable := range sourceTables {
size, err2 := utils.FetchTableEstimatedBytes(
size, err2 := conn.FetchTableEstimatedBytes(
ctx,
c.instances[i].sourceDB.DB,
c.instances[i].sourceDB,
sourceTable.Schema,
sourceTable.Name,
)
Expand Down Expand Up @@ -288,7 +287,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
))
}
// sourceID -> DB
upstreamDBs := make(map[string]*sql.DB)
upstreamDBs := make(map[string]*conn.BaseDB)
for i, instance := range c.instances {
sourceID := instance.cfg.SourceID
// init online ddl for checker
Expand All @@ -303,7 +302,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewMySQLVersionChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}

upstreamDBs[sourceID] = instance.sourceDB.DB
upstreamDBs[sourceID] = instance.sourceDB
if instance.cfg.Mode != config.ModeIncrement {
// increment mode needn't check dump privilege
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
Expand Down Expand Up @@ -351,7 +350,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if _, ok := c.checkingItems[config.TableSchemaChecking]; ok {
c.checkList = append(c.checkList, checker.NewTablesChecker(
upstreamDBs,
c.instances[0].targetDB.DB,
c.instances[0].targetDB,
info.sourceID2TableMap,
info.targetTable2ExtendedColumns,
dumpThreads,
Expand Down Expand Up @@ -515,8 +514,8 @@ func (c *Checker) fetchSourceTargetDB(
Password: instance.cfg.From.Password,
}
dbCfg := instance.cfg.From
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.sourceDB, err = conn.DefaultDBProvider.Apply(&dbCfg)
dbCfg.RawDBCfg = dbconfig.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.sourceDB, err = conn.GetUpstreamDB(&dbCfg)
if err != nil {
return nil, nil, terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.From.User, instance.cfg.From.Host, instance.cfg.From.Port), terror.ScopeUpstream)
}
Expand All @@ -527,12 +526,12 @@ func (c *Checker) fetchSourceTargetDB(
Password: instance.cfg.To.Password,
}
dbCfg = instance.cfg.To
dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.targetDB, err = conn.DefaultDBProvider.Apply(&dbCfg)
dbCfg.RawDBCfg = dbconfig.DefaultRawDBConfig().SetReadTimeout(readTimeout)
instance.targetDB, err = conn.GetDownstreamDB(&dbCfg)
if err != nil {
return nil, nil, terror.WithScope(terror.ErrTaskCheckFailedOpenDB.Delegate(err, instance.cfg.To.User, instance.cfg.To.Host, instance.cfg.To.Port), terror.ScopeDownstream)
}
return utils.FetchTargetDoTables(ctx, instance.cfg.SourceID, instance.sourceDB.DB, instance.baList, r)
return conn.FetchTargetDoTables(ctx, instance.cfg.SourceID, instance.sourceDB, instance.baList, r)
}

func (c *Checker) displayCheckingItems() string {
Expand Down Expand Up @@ -736,7 +735,7 @@ func (c *Checker) IsFreshTask() (bool, error) {
c.tctx.Logger.Info("exec query", zap.String("sql", sql))
rows, err := instance.targetDB.DB.QueryContext(c.tctx.Ctx, sql)
if err != nil {
if utils.IsMySQLError(err, mysql.ErrNoSuchTable) {
if conn.IsMySQLError(err, mysql.ErrNoSuchTable) {
continue
}
return false, err
Expand Down
Loading

0 comments on commit 5859eb6

Please sign in to comment.