Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-worker: refine conn #266

Merged
merged 61 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f8ece5b
define conn
3pointer Aug 20, 2019
ba9c588
change upstream DB to BaseDB
3pointer Sep 16, 2019
25b7016
update raw db config
3pointer Sep 16, 2019
7ce2dac
fix test
3pointer Sep 16, 2019
ecf7bf5
remove useless code
3pointer Sep 16, 2019
f579445
add resetConnFn instead of baseDB in syncer.WorkerConn to reset
3pointer Sep 16, 2019
2f90c68
Update loader/checkpoint.go
3pointer Sep 17, 2019
9585091
Update loader/checkpoint.go
3pointer Sep 17, 2019
d8e92cc
address comments
3pointer Sep 17, 2019
418ccf8
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 17, 2019
a710aff
address comment
3pointer Sep 17, 2019
ed5c1a9
address comments
3pointer Sep 18, 2019
da36007
address comments
3pointer Sep 18, 2019
9133d17
Merge branch 'master' into refine_conn
3pointer Sep 18, 2019
8035a0c
address comment
3pointer Sep 18, 2019
8266bf0
address comment
3pointer Sep 23, 2019
9e031c4
baseDB close conns generated from itself
3pointer Sep 23, 2019
cd8c5f6
resolve conflict
3pointer Sep 23, 2019
6e65f70
address comment
3pointer Sep 25, 2019
a8342a1
use createConns instead of createConn
3pointer Sep 25, 2019
25cbcdc
address comments
3pointer Sep 25, 2019
f80c5bc
address comments
3pointer Sep 25, 2019
67cf1e2
address comments
3pointer Sep 25, 2019
2ccdefe
Merge branch 'master' into refine_conn
3pointer Sep 25, 2019
d3dd6c1
address comments
3pointer Sep 26, 2019
306aa19
remove loader DBConn close
3pointer Sep 26, 2019
faa4876
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 26, 2019
1168803
Merge branch 'master' into refine_conn
3pointer Sep 26, 2019
553578e
address comment
3pointer Sep 26, 2019
fc6929e
Merge branch 'refine_conn' of https://github.com/3pointer/dm into ref…
3pointer Sep 26, 2019
fd606de
address comments
3pointer Sep 29, 2019
a835b47
address comment
3pointer Sep 29, 2019
62e64b2
Update loader/db.go
3pointer Oct 8, 2019
7d0a41c
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
7816f31
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
fd6caa4
Update pkg/conn/basedb.go
3pointer Oct 8, 2019
2446fb4
loader unit add resetConn function
3pointer Oct 8, 2019
457b856
address comment
3pointer Oct 8, 2019
dec0aeb
add comments for BaseConn
3pointer Oct 8, 2019
5953333
update comments
3pointer Oct 8, 2019
21d8523
address comments
3pointer Oct 9, 2019
17c0594
address comments
3pointer Oct 10, 2019
7e6abba
address comment
3pointer Oct 11, 2019
a7f91d8
refine reset in baseConn
3pointer Oct 11, 2019
8b2be19
address comments
3pointer Oct 11, 2019
8a724da
Merge branch 'master' into refine_conn
csuzhangxc Oct 12, 2019
eac3868
use reset closure to reset BaseConn
3pointer Oct 12, 2019
032912b
address comment
3pointer Oct 12, 2019
1b9e351
address comments
3pointer Oct 14, 2019
92459f4
fix test
3pointer Oct 14, 2019
ad11983
fix test
3pointer Oct 14, 2019
35376e7
address comment
3pointer Oct 15, 2019
26190cc
address comment
3pointer Oct 15, 2019
b8f4a92
fix test
3pointer Oct 15, 2019
b8afd4d
return commit err index
3pointer Oct 16, 2019
ae9080c
resolve conflict
3pointer Oct 16, 2019
1104960
Merge branch 'master' into refine_conn
3pointer Oct 17, 2019
0a62846
fix test
3pointer Oct 17, 2019
6407d25
resolve conflict
3pointer Oct 17, 2019
8ac2bf1
address comments
3pointer Oct 18, 2019
aaa8665
address comment
3pointer Oct 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,41 @@ var (
defaultMaxAllowedPacket = 64 * 1024 * 1024 // 64MiB, equal to TiDB's default
)

// RawDBConfig contains some low level database config
type RawDBConfig struct {
MaxIdleConns int
ReadTimeout string
WriteTimeout string
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig(readTimeout string) *RawDBConfig {
return &RawDBConfig{
ReadTimeout: readTimeout,
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}
}

// AddWriteTimeout adds writeTimeout for raw database
func (c *RawDBConfig) AddWriteTimeout(writeTimeout string) *RawDBConfig {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
c.WriteTimeout = writeTimeout
return c
}

// AddMaxIdleConns set maxIdleConns for raw database
func (c *RawDBConfig) AddMaxIdleConns(conns int) *RawDBConfig {
c.MaxIdleConns = conns
return c
}

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

func (db *DBConfig) String() string {
Expand Down
17 changes: 14 additions & 3 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"go.uber.org/zap"
Expand Down Expand Up @@ -63,7 +65,8 @@ type CheckPoint interface {

// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
type RemoteCheckPoint struct {
conn *Conn // NOTE: use dbutil in tidb-tools later
db *conn.BaseDB
conn *WorkerConn // NOTE: use dbutil in tidb-tools later
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
id string
schema string
table string
Expand All @@ -73,14 +76,15 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
conn, err := createConn(cfg)
db, conn, err := createConn(tctx.Context(), cfg)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint")))

cp := &RemoteCheckPoint{
db: db,
conn: conn,
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
Expand Down Expand Up @@ -293,7 +297,14 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {

// Close implements CheckPoint.Close
func (cp *RemoteCheckPoint) Close() {
closeConn(cp.conn)
err := cp.conn.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint connection error", log.ShortError(err))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}
err = cp.db.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint db error", log.ShortError(err))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}
}

// GenSQL implements CheckPoint.GenSQL
Expand Down
7 changes: 5 additions & 2 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(count, Equals, len(cases))

// update checkpoints
conn, err := createConn(t.cfg)
db, conn, err := createConn(tctx.Context(), t.cfg)
c.Assert(err, IsNil)
defer closeConn(conn)
defer func() {
conn.Close()
db.Close()
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
}()
for _, cs := range cases {
sql2 := cp.GenSQL(cs.filename, cs.endPos)
err = conn.executeSQL(tctx, []string{sql2})
Expand Down
59 changes: 40 additions & 19 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package loader

import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -27,21 +27,21 @@ import (
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/baseconn"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Conn represents a live DB connection
type Conn struct {
// WorkerConn represents a live DB connection
type WorkerConn struct {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
cfg *config.SubTaskConfig
baseConn *baseconn.BaseConn
baseConn *conn.BaseConn
}

func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
func (conn *WorkerConn) querySQL(ctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
if conn == nil || conn.baseConn == nil {
return nil, terror.ErrDBUnExpect.Generate("database connection not valid")
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (conn *Conn) querySQL(ctx *tcontext.Context, query string, args ...interfac
return ret.(*sql.Rows), nil
}

func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
func (conn *WorkerConn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]interface{}) error {
if len(queries) == 0 {
return nil
}
Expand Down Expand Up @@ -149,23 +149,44 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
return err
}

func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
// Close release db connection resource, return it to BaseDB.db connection pool
func (conn *WorkerConn) Close() error {
if conn == nil || conn.baseConn == nil {
return nil
}

return &Conn{baseConn: baseConn, cfg: cfg}, nil
return conn.baseConn.Close()
}

func closeConn(conn *Conn) error {
if conn.baseConn == nil {
return nil
func createConn(ctx context.Context, cfg *config.SubTaskConfig) (*conn.BaseDB, *WorkerConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
if err != nil {
return nil, nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}
baseConn, err := baseDB.GetBaseConn(ctx)
if err != nil {
return nil, nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}
return baseDB, &WorkerConn{baseConn: baseConn, cfg: cfg}, nil
}

return terror.DBErrorAdapt(conn.baseConn.Close(), terror.ErrDBDriverError)
func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*WorkerConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
if err != nil {
return nil, nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
}
conns := make([]*WorkerConn, 0, workerCount)
for i := 0; i < workerCount; i++ {
baseConn, err := baseDB.GetBaseConn(tctx.Context())
if err != nil {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
err := baseDB.Close()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tctx.L().Error("failed to close baseDB")
}
return nil, nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}
conns = append(conns, &WorkerConn{baseConn: baseConn, cfg: cfg})
}
return baseDB, conns, nil
}

func isErrDBExists(err error) bool {
Expand Down
38 changes: 24 additions & 14 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -55,7 +56,6 @@ type DataFiles []string

// Tables2DataFiles represent all data files of a table collection as a map
type Tables2DataFiles map[string]DataFiles

type dataJob struct {
sql string
schema string
Expand All @@ -77,7 +77,7 @@ type Worker struct {
id int
cfg *config.SubTaskConfig
checkPoint CheckPoint
conn *Conn
conn *WorkerConn
wg sync.WaitGroup
jobQueue chan *dataJob
loader *Loader
Expand All @@ -89,18 +89,13 @@ type Worker struct {

// NewWorker returns a Worker.
func NewWorker(loader *Loader, id int) (worker *Worker, err error) {
conn, err := createConn(loader.cfg)
if err != nil {
return nil, err
}

ctctx := loader.tctx.WithLogger(loader.tctx.L().WithFields(zap.Int("worker ID", id)))

return &Worker{
id: id,
cfg: loader.cfg,
checkPoint: loader.checkPoint,
conn: conn,
conn: loader.toDBConns[id],
jobQueue: make(chan *dataJob, jobCount),
loader: loader,
tctx: ctctx,
Expand All @@ -115,7 +110,7 @@ func (w *Worker) Close() {

close(w.jobQueue)
w.wg.Wait()
closeConn(w.conn)
w.conn.Close()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) {
Expand Down Expand Up @@ -342,6 +337,9 @@ type Loader struct {
pool []*Worker
closed sync2.AtomicBool

toDB *conn.BaseDB
toDBConns []*WorkerConn

totalDataSize sync2.AtomicInt64
totalFileCount sync2.AtomicInt64 // schema + table + data
finishedDataSize sync2.AtomicInt64
Expand Down Expand Up @@ -409,6 +407,11 @@ func (l *Loader) Init() (err error) {
}
}

l.toDB, l.toDBConns, err = createConns(l.tctx, l.cfg, l.cfg.PoolSize)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -556,6 +559,10 @@ func (l *Loader) Close() {
}

l.stopLoad()
err := l.toDB.Close()
if err != nil {
l.tctx.L().Error("close toDB error", log.ShortError(err))
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}
l.checkPoint.Close()
l.closed.Set(true)
}
Expand Down Expand Up @@ -855,7 +862,7 @@ func (l *Loader) prepare() error {
}

// restoreSchema creates schema
func (l *Loader) restoreSchema(conn *Conn, sqlFile, schema string) error {
func (l *Loader) restoreSchema(conn *WorkerConn, sqlFile, schema string) error {
err := l.restoreStructure(conn, sqlFile, schema, "")
if err != nil {
if isErrDBExists(err) {
Expand All @@ -868,7 +875,7 @@ func (l *Loader) restoreSchema(conn *Conn, sqlFile, schema string) error {
}

// restoreTable creates table
func (l *Loader) restoreTable(conn *Conn, sqlFile, schema, table string) error {
func (l *Loader) restoreTable(conn *WorkerConn, sqlFile, schema, table string) error {
err := l.restoreStructure(conn, sqlFile, schema, table)
if err != nil {
if isErrTableExists(err) {
Expand All @@ -881,7 +888,7 @@ func (l *Loader) restoreTable(conn *Conn, sqlFile, schema, table string) error {
}

// restoreStruture creates schema or table
func (l *Loader) restoreStructure(conn *Conn, sqlFile string, schema string, table string) error {
func (l *Loader) restoreStructure(conn *WorkerConn, sqlFile string, schema string, table string) error {
f, err := os.Open(sqlFile)
if err != nil {
return terror.ErrLoadUnitReadSchemaFile.Delegate(err)
Expand Down Expand Up @@ -968,11 +975,14 @@ func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, ta
func (l *Loader) restoreData(ctx context.Context) error {
begin := time.Now()

conn, err := createConn(l.cfg)
db, conn, err := createConn(ctx, l.cfg)
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer conn.baseConn.Close()
defer func() {
conn.baseConn.Close()
db.Close()
}()

dispatchMap := make(map[string]*fileJob)

Expand Down
Loading