Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

performance: use OPTIMISTIC transaction as the default #1079

Merged
merged 28 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f8e909a
use OPTIMISTIC transaction as the default
GMHDBJD Sep 23, 2020
d24bec9
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
f1eab4a
get variables if user doesn't specify
GMHDBJD Sep 23, 2020
1305528
fix ut
GMHDBJD Sep 23, 2020
0cb219b
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
1c25c2f
address comment
GMHDBJD Sep 23, 2020
715406d
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
9eb6e91
Merge branch 'master' into defautlOptimistic
csuzhangxc Sep 24, 2020
a50cd9f
apply default session config base on tidb's version
GMHDBJD Sep 24, 2020
6dc5e40
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 24, 2020
6a2961e
decrypt password
GMHDBJD Sep 24, 2020
fd24e53
fix it
GMHDBJD Sep 25, 2020
328817b
remove line
GMHDBJD Sep 25, 2020
57c8036
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
81e1294
temp commit
GMHDBJD Sep 25, 2020
6e1899d
close db and fix test
GMHDBJD Sep 25, 2020
0dcceb3
add ut
GMHDBJD Sep 25, 2020
a07c915
add it
GMHDBJD Sep 25, 2020
6fb9d64
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
614def3
debug ci
GMHDBJD Sep 25, 2020
512048a
support beta version
GMHDBJD Sep 25, 2020
9c4115b
Revert "debug ci"
GMHDBJD Sep 25, 2020
a345baa
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
ae64af7
revert clearEecdEnv
GMHDBJD Sep 25, 2020
f93a284
add beta version ut
GMHDBJD Sep 25, 2020
fd2bb13
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
a3bdfb4
minor fix
GMHDBJD Sep 27, 2020
e3ca26c
address commment
GMHDBJD Sep 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/dustin/go-humanize"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
Expand All @@ -41,8 +42,10 @@ const (

// shard DDL mode.
const (
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
tidbTxnMode = "tidb_txn_mode"
tidbTxnOptimistic = "optimistic"
)

// default config item values
Expand All @@ -66,6 +69,15 @@ var (
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds

// TargetDBConfig
defaultSessionCfg = []struct {
key string
val string
minVersion utils.TiDBVersion
}{
{tidbTxnMode, tidbTxnOptimistic, utils.TiDBVersion{3, 0, 0}},
}
)

// Meta represents binlog's meta pos
Expand Down Expand Up @@ -707,3 +719,18 @@ func checkDuplicateString(ruleNames []string) []string {
}
return dupeArray
}

// AdjustTargetDBSessionCfg adjust session cfg of TiDB
func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version utils.TiDBVersion) {
lowerMap := make(map[string]string, len(dbConfig.Session))
for k, v := range dbConfig.Session {
lowerMap[strings.ToLower(k)] = v
}
// all cfg in defaultSessionCfg should be lower case
for _, cfg := range defaultSessionCfg {
if _, ok := lowerMap[cfg.key]; !ok && version.Ge(cfg.minVersion) {
lowerMap[cfg.key] = cfg.val
}
}
dbConfig.Session = lowerMap
}
46 changes: 42 additions & 4 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
Expand Down Expand Up @@ -324,7 +325,10 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
heartbeatRI = 21
timezone = "Asia/Shanghai"
maxAllowedPacket = 10244201
session = map[string]string{
fromSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
toSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
security = Security{
Expand Down Expand Up @@ -380,7 +384,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_1",
Password: "123",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand All @@ -390,7 +394,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_2",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand Down Expand Up @@ -421,7 +425,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_to",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: toSession,
Security: &security,
RawDBCfg: &rawDBCfg,
},
Expand Down Expand Up @@ -645,3 +649,37 @@ func (t *testConfig) TestMySQLInstance(c *C) {
c.Assert(m.VerifyAndAdjust(), IsNil)

}

func (t *testConfig) TestAdjustTargetDBConfig(c *C) {
testCases := []struct {
dbConfig DBConfig
result DBConfig
version utils.TiDBVersion
}{
{
DBConfig{},
DBConfig{Session: map[string]string{}},
utils.TiDBVersion{0, 0, 0},
},
{
DBConfig{Session: map[string]string{"ANSI_QUOTES": ""}},
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
DBConfig{Session: map[string]string{"ansi_quotes": ""}},
utils.TiDBVersion{2, 0, 7},
},
{
DBConfig{},
DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic}},
utils.TiDBVersion{3, 0, 1},
},
{
DBConfig{Session: map[string]string{"ANSI_QUOTES": "", tidbTxnMode: "pessimistic"}},
DBConfig{Session: map[string]string{"ansi_quotes": "", tidbTxnMode: "pessimistic"}},
utils.TiDBVersion{4, 0, 2},
},
}

for _, tc := range testCases {
AdjustTargetDBSessionCfg(&tc.dbConfig, tc.version)
c.Assert(tc.dbConfig, DeepEquals, tc.result)
}
}
35 changes: 35 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,17 +1027,47 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro
return cfgs, err
}
if err = cfg.Adjust(fromDB.DB); err != nil {
fromDB.Close()
return cfgs, err
}
if _, err = cfg.Yaml(); err != nil {
fromDB.Close()
return cfgs, err
}

fromDB.Close()
cfgs[i] = cfg
}
return cfgs, nil
}

func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting this function into task.go as a member of TaskConfig? we may need to use it in other places later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

circle import if move it to task.go because we import config in pkg.conn. 🤔

cfg := *dbConfig
if len(cfg.Password) > 0 {
cfg.Password = utils.DecryptOrPlaintext(cfg.Password)
}

toDB, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
return err
}
defer toDB.Close()

value, err := dbutil.ShowVersion(ctx, toDB.DB)
if err != nil {
return err
}

version, err := utils.ToTiDBVersion(value)
// Do not adjust if not TiDB
if err == nil {
config.AdjustTargetDBSessionCfg(dbConfig, version)
} else {
log.L().Warn("get tidb version", log.ShortError(err))
}
return nil
}

// OperateSource will create or update an upstream source.
func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest) (*pb.OperateSourceResponse, error) {
var (
Expand Down Expand Up @@ -1194,6 +1224,11 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

err = adjustTargetDB(ctx, cfg.TargetDB)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

sourceCfgs, err := s.getSourceConfigs(cfg.MySQLInstances)
if err != nil {
return nil, nil, err
Expand Down
87 changes: 78 additions & 9 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ func (t *testMaster) TestCheckTask(c *check.C) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients)
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err := server.CheckTask(context.Background(), &pb.CheckTaskRequest{
Task: taskConfig,
})
Expand All @@ -373,6 +379,9 @@ func (t *testMaster) TestCheckTask(c *check.C) {
// simulate invalid password returned from scheduler, but config was supported plaintext mysql password, so cfg.SubTaskConfigs will success
ctx, cancel = context.WithCancel(context.Background())
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "invalid-encrypt-password", t.workerClients)
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.CheckTask(context.Background(), &pb.CheckTaskRequest{
Task: taskConfig,
})
Expand Down Expand Up @@ -406,6 +415,12 @@ func (t *testMaster) TestStartTask(c *check.C) {
}
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), req)
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.IsTrue)
Expand All @@ -420,6 +435,9 @@ func (t *testMaster) TestStartTask(c *check.C) {

// check start-task with an invalid source
invalidSource := "invalid-source"
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{
Task: taskConfig,
Sources: []string{invalidSource},
Expand All @@ -438,6 +456,9 @@ func (t *testMaster) TestStartTask(c *check.C) {
defer func() {
checker.CheckSyncConfigFunc = bakCheckSyncConfigFunc
}()
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{
Task: taskConfig,
Sources: sources,
Expand All @@ -448,13 +469,41 @@ func (t *testMaster) TestStartTask(c *check.C) {
clearSchedulerEnv(c, cancel, &wg)
}

// db use for remove data
// verDB user for show version
type mockDBProvider struct {
db *sql.DB
verDB *sql.DB
db *sql.DB
}

// Apply will build BaseDB with DBConfig
// return db if verDB was closed
func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) {
return conn.NewBaseDB(d.db, func() {}), nil
if err := d.verDB.Ping(); err != nil {
return conn.NewBaseDB(d.db, func() {}), nil
}
return conn.NewBaseDB(d.verDB, func() {}), nil
}

func (t *testMaster) initVersionDB(c *check.C) sqlmock.Sqlmock {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok {
mdbp.verDB = db
} else {
conn.DefaultDBProvider = &mockDBProvider{verDB: db}
}
return mock
}

func (t *testMaster) initMockDB(c *check.C) sqlmock.Sqlmock {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok {
mdbp.db = db
} else {
conn.DefaultDBProvider = &mockDBProvider{db: db}
}
return mock
}

func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
Expand Down Expand Up @@ -503,12 +552,13 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
c.Assert(server.pessimist.Start(ctx, etcdTestCli), check.IsNil)
c.Assert(server.optimist.Start(ctx, etcdTestCli), check.IsNil)

db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
conn.DefaultDBProvider = &mockDBProvider{db: db}
verMock := t.initVersionDB(c)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
mock := t.initMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand All @@ -523,6 +573,9 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
defer wg.Done()
time.Sleep(10 * time.Microsecond)
// start another same task at the same time, should get err
verMock2 := t.initVersionDB(c)
verMock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp1, err1 := server.StartTask(context.Background(), req)
c.Assert(err1, check.IsNil)
c.Assert(resp1.Result, check.IsFalse)
Expand Down Expand Up @@ -593,9 +646,10 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
err = server.optimist.Start(ctx, etcdTestCli)
c.Assert(err, check.IsNil)

db, mock, err = sqlmock.New()
c.Assert(err, check.IsNil)
conn.DefaultDBProvider = &mockDBProvider{db: db}
verMock = t.initVersionDB(c)
verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
mock = t.initMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand All @@ -610,6 +664,9 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
defer wg.Done()
time.Sleep(10 * time.Microsecond)
// start another same task at the same time, should get err
vermock2 := t.initVersionDB(c)
vermock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp1, err1 := server.StartTask(context.Background(), req)
c.Assert(err1, check.IsNil)
c.Assert(resp1.Result, check.IsFalse)
Expand Down Expand Up @@ -692,6 +749,12 @@ func (t *testMaster) TestOperateTask(c *check.C) {
sourceResps := []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}, {Result: true, Source: sources[1]}}
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, startReq, pauseReq, resumeReq, stopReq1, stopReq2))
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
stResp, err := server.StartTask(context.Background(), startReq)
c.Assert(err, check.IsNil)
c.Assert(stResp.Result, check.IsTrue)
Expand Down Expand Up @@ -1328,6 +1391,12 @@ func (t *testMaster) TestGetTaskCfg(c *check.C) {
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))

// start task
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err := server.StartTask(context.Background(), req)
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.IsTrue)
Expand Down
10 changes: 7 additions & 3 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker

if len(sessionCfg) == 0 {
sessionCfg = make(map[string]string)
var ignoredColumn interface{}
for _, k := range sessionVars {
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k))
}
// get variables if user doesn't specify
// all cfg in sessionVars should be lower case
for _, k := range sessionVars {
if _, ok := sessionCfg[k]; !ok {
var ignoredColumn interface{}
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
}
Expand Down
Loading