Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

performance: use OPTIMISTIC transaction as the default #1079

Merged
merged 28 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f8e909a
use OPTIMISTIC transaction as the default
GMHDBJD Sep 23, 2020
d24bec9
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
f1eab4a
get variables if user doesn't specify
GMHDBJD Sep 23, 2020
1305528
fix ut
GMHDBJD Sep 23, 2020
0cb219b
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
1c25c2f
address comment
GMHDBJD Sep 23, 2020
715406d
Merge branch 'master' into defautlOptimistic
lance6716 Sep 23, 2020
9eb6e91
Merge branch 'master' into defautlOptimistic
csuzhangxc Sep 24, 2020
a50cd9f
apply default session config base on tidb's version
GMHDBJD Sep 24, 2020
6dc5e40
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 24, 2020
6a2961e
decrypt password
GMHDBJD Sep 24, 2020
fd24e53
fix it
GMHDBJD Sep 25, 2020
328817b
remove line
GMHDBJD Sep 25, 2020
57c8036
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
81e1294
temp commit
GMHDBJD Sep 25, 2020
6e1899d
close db and fix test
GMHDBJD Sep 25, 2020
0dcceb3
add ut
GMHDBJD Sep 25, 2020
a07c915
add it
GMHDBJD Sep 25, 2020
6fb9d64
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
614def3
debug ci
GMHDBJD Sep 25, 2020
512048a
support beta version
GMHDBJD Sep 25, 2020
9c4115b
Revert "debug ci"
GMHDBJD Sep 25, 2020
a345baa
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
ae64af7
revert clearEecdEnv
GMHDBJD Sep 25, 2020
f93a284
add beta version ut
GMHDBJD Sep 25, 2020
fd2bb13
Merge branch 'master' into defautlOptimistic
GMHDBJD Sep 25, 2020
a3bdfb4
minor fix
GMHDBJD Sep 27, 2020
e3ca26c
address commment
GMHDBJD Sep 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ const (

// shard DDL mode.
const (
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
tidbTxnMode = "tidb_txn_mode"
tidbTxnOptimistic = "optimistic"
)

// default config item values
Expand All @@ -66,6 +68,11 @@ var (
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds

// TargetDBConfig
defaultSessionCfg = map[string]string{
tidbTxnMode: tidbTxnOptimistic,
Copy link
Member

@csuzhangxc csuzhangxc Sep 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a lower version of TiDB without optimistic TXN works well (or without interrupt)?

}
)

// Meta represents binlog's meta pos
Expand Down Expand Up @@ -411,6 +418,7 @@ func (c *TaskConfig) adjust() error {
if c.TargetDB == nil {
return terror.ErrConfigNeedTargetDB.Generate()
}
adjustTargetDB(c.TargetDB)

if len(c.MySQLInstances) == 0 {
return terror.ErrConfigMySQLInstsAtLeastOne.Generate()
Expand Down Expand Up @@ -707,3 +715,14 @@ func checkDuplicateString(ruleNames []string) []string {
}
return dupeArray
}

func adjustTargetDB(dbConfig *DBConfig) {
if dbConfig.Session == nil {
dbConfig.Session = make(map[string]string, len(defaultSessionCfg))
}
for k, v := range defaultSessionCfg {
if _, ok := dbConfig.Session[k]; !ok {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
dbConfig.Session[k] = v
}
}
}
31 changes: 27 additions & 4 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
heartbeatRI = 21
timezone = "Asia/Shanghai"
maxAllowedPacket = 10244201
session = map[string]string{
fromSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
toSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
security = Security{
Expand Down Expand Up @@ -380,7 +383,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_1",
Password: "123",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand All @@ -390,7 +393,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_2",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand Down Expand Up @@ -421,7 +424,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_to",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: toSession,
Security: &security,
RawDBCfg: &rawDBCfg,
},
Expand Down Expand Up @@ -576,6 +579,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
stCfgs[0].EnableANSIQuotes = stCfg1.EnableANSIQuotes
stCfgs[1].EnableANSIQuotes = stCfg2.EnableANSIQuotes
c.Assert(stCfgs[0].String(), Equals, stCfg1.String())
// subtask session cfg also changed because we ref DBConfig when merge from subtask config
stCfg2.To.Session[tidbTxnMode] = tidbTxnOptimistic
c.Assert(stCfgs[1].String(), Equals, stCfg2.String())
}

Expand Down Expand Up @@ -645,3 +650,21 @@ func (t *testConfig) TestMySQLInstance(c *C) {
c.Assert(m.VerifyAndAdjust(), IsNil)

}

func (t *testConfig) TestAdjustSessionCfg(c *C) {
sessionCfg := map[string]string{tidbTxnMode: tidbTxnOptimistic}
dbCfg := &DBConfig{}
adjustTargetDB(dbCfg)
c.Assert(dbCfg.Session, DeepEquals, sessionCfg)

sessionCfg["sql_mode"] = "ANSI_QUOTES"
dbCfg.Session["sql_mode"] = "ANSI_QUOTES"
adjustTargetDB(dbCfg)
c.Assert(dbCfg.Session, DeepEquals, sessionCfg)

tidbTxnPessimistic := "pessimistic"
sessionCfg[tidbTxnMode] = tidbTxnPessimistic
dbCfg.Session[tidbTxnMode] = tidbTxnPessimistic
adjustTargetDB(dbCfg)
c.Assert(dbCfg.Session, DeepEquals, sessionCfg)
}
9 changes: 6 additions & 3 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker

if len(sessionCfg) == 0 {
sessionCfg = make(map[string]string)
var ignoredColumn interface{}
for _, k := range sessionVars {
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k))
}
// get variables if user doesn't specify
for _, k := range sessionVars {
if _, ok := sessionCfg[k]; !ok {
var ignoredColumn interface{}
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
}
Expand Down
5 changes: 4 additions & 1 deletion syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) {
log.SetLevel(zapcore.ErrorLevel)
var (
err error
defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"}
defaultTestSessionCfg = map[string]string{
"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION",
"tidb_skip_utf8_check": "0",
}
)

s.tracker, err = schema.NewTracker(defaultTestSessionCfg, nil)
Expand Down
5 changes: 4 additions & 1 deletion syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ import (
var _ = Suite(&testSyncerSuite{})

var (
defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"}
defaultTestSessionCfg = map[string]string{
"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION",
"tidb_skip_utf8_check": "0",
}
)

func TestSuite(t *testing.T) {
Expand Down