Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
optimist: only update table info if no conflict (#1496)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Mar 11, 2021
1 parent 5141e05 commit 301dd99
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 25 deletions.
8 changes: 4 additions & 4 deletions dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func NewRootCmd() *cobra.Command {
Simply type ` + cmd.Name() + ` help [path to command] for full details.`,

Run: func(c *cobra.Command, args []string) {
cmd, _, e := c.Root().Find(args)
if cmd == nil || e != nil {
cmd2, _, e := c.Root().Find(args)
if cmd2 == nil || e != nil {
c.Printf("Unknown help topic %#q\n", args)
_ = c.Root().Usage()
} else {
cmd.InitDefaultHelpFlag() // make possible 'help' flag to be shown
_ = cmd.Help()
cmd2.InitDefaultHelpFlag() // make possible 'help' flag to be shown
_ = cmd2.Help()
}
},
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,23 @@ func (l *Lock) TrySync(callerSource, callerSchema, callerTable string,
prevTable := l.tables[callerSource][callerSchema][callerTable]
oldJoined := l.joined

// update table info and joined info base on the last new table info
lastTableInfo := schemacmp.Encode(newTIs[len(newTIs)-1])
log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable),
zap.Stringer("from", prevTable), zap.Stringer("to", lastTableInfo), zap.Strings("ddls", ddls))
l.tables[callerSource][callerSchema][callerTable] = lastTableInfo

lastJoined, err := joinTable(lastTableInfo)
if err != nil {
return emptyDDLs, err
}
// update the current joined table info, it should be logged in `if cmp != 0` block below.
l.joined = lastJoined

defer func() {
// only update table info and joined info if no error
if err == nil {
// update table info and joined info base on the last new table info
log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable),
zap.Stringer("from", l.tables[callerSource][callerSchema][callerTable]), zap.Stringer("to", lastTableInfo), zap.Strings("ddls", ddls))
l.tables[callerSource][callerSchema][callerTable] = lastTableInfo
// update the current joined table info, it should be logged in `if cmp != 0` block below.
l.joined = lastJoined
}
}()

newDDLs = []string{}
nextTable := prevTable
Expand Down
42 changes: 28 additions & 14 deletions pkg/shardddl/optimism/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,9 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) {
c.Assert(DDLs, DeepEquals, []string{})
c.Assert(l.versions, DeepEquals, vers)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, ErrorMatches, ".*at tuple index.*")
c.Assert(cmp, Equals, 0)
// join table isn't updated
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)
ready = l.Ready()
c.Assert(ready[source][db][tbls[1]], IsFalse)

Expand All @@ -885,15 +886,27 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) {
c.Assert(err, IsNil)
c.Assert(DDLs, DeepEquals, DDLs3)
c.Assert(l.versions, DeepEquals, vers)
ready = l.Ready()
c.Assert(ready[source][db][tbls[0]], IsFalse)
c.Assert(ready[source][db][tbls[1]], IsTrue) // the second table become synced now.
ready = l.Ready() // all table ready
c.Assert(ready[source][db][tbls[0]], IsTrue)
c.Assert(ready[source][db][tbls[1]], IsTrue)
cmp, err = l.tables[source][db][tbls[0]].Compare(l.Joined())
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)
c.Assert(cmp, Equals, 0)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)

// TrySync for the second table, succeed now
vers[source][db][tbls[1]]++
DDLs, err = l.TrySync(source, db, tbls[1], DDLs2, []*model.TableInfo{ti2_1, ti2}, tts, vers[source][db][tbls[1]])
c.Assert(err, IsNil)
c.Assert(DDLs, DeepEquals, DDLs2)
c.Assert(l.versions, DeepEquals, vers)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
ready = l.Ready()
c.Assert(ready[source][db][tbls[1]], IsTrue)

// TrySync for the first table.
vers[source][db][tbls[0]]++
Expand Down Expand Up @@ -977,8 +990,9 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) {
c.Assert(DDLs, DeepEquals, []string{})
c.Assert(l.versions, DeepEquals, vers)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, ErrorMatches, ".*at tuple index.*")
c.Assert(cmp, Equals, 0)
// join table isn't updated
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)
ready = l.Ready()
c.Assert(ready[source][db][tbls[1]], IsFalse)

Expand All @@ -989,8 +1003,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) {
c.Assert(DDLs, DeepEquals, []string{})
c.Assert(l.versions, DeepEquals, vers)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, ErrorMatches, ".*at tuple index.*")
c.Assert(cmp, Equals, 0)
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)

// TrySync for the second table to drop the non-conflict column, the conflict should still exist.
vers[source][db][tbls[1]]++
Expand All @@ -999,8 +1013,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) {
c.Assert(DDLs, DeepEquals, []string{})
c.Assert(l.versions, DeepEquals, vers)
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, ErrorMatches, ".*at tuple index.*")
c.Assert(cmp, Equals, 0)
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)
ready = l.Ready()
c.Assert(ready[source][db][tbls[1]], IsFalse)

Expand Down Expand Up @@ -1051,8 +1065,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) {
c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue)
c.Assert(DDLs, DeepEquals, []string{})
cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined())
c.Assert(err, ErrorMatches, ".*at tuple index.*")
c.Assert(cmp, Equals, 0)
c.Assert(err, IsNil)
c.Assert(cmp, Equals, -1)
c.Assert(l.versions, DeepEquals, vers)
ready = l.Ready()
c.Assert(ready[source][db][tbls[1]], IsFalse)
Expand Down
55 changes: 55 additions & 0 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,60 @@ function DM_SKIP_INCOMPATIBLE_DDL() {
run_case SKIP_INCOMPATIBLE_DDL "single-source-no-sharding" "init_table 11" "clean_table" ""
}

function DM_REPLACE_DEFAULT_VALUE_CASE() {
run_sql_source1 "insert into ${db}.${tb1} values(1);"
run_sql_source2 "insert into ${db}.${tb1} values(2);"
run_sql_source2 "insert into ${db}.${tb2} values(3);"

run_sql_source1 "alter table ${db}.${tb1} add new_col1 int default 1;"
run_sql_source1 "insert into ${db}.${tb1} values(4,4);"
run_sql_source2 "insert into ${db}.${tb1} values(5);"
run_sql_source2 "insert into ${db}.${tb2} values(6);"
run_sql_source2 "alter table ${db}.${tb1} add new_col1 int default 2;"
run_sql_source1 "insert into ${db}.${tb1} values(7,7);"
run_sql_source2 "insert into ${db}.${tb1} values(8,8);"
run_sql_source2 "insert into ${db}.${tb2} values(9);"
run_sql_source2 "alter table ${db}.${tb2} add new_col1 int default 3;"
run_sql_source1 "insert into ${db}.${tb1} values(10,10);"
run_sql_source2 "insert into ${db}.${tb1} values(11,11);"
run_sql_source2 "insert into ${db}.${tb2} values(12,12);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"because schema conflict detected" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test -s mysql-replica-02 replace alter table ${db}.${tb1} add new_col1 int default 1;" \
"\"result\": true" 2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"because schema conflict detected" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test -s mysql-replica-02 replace alter table ${db}.${tb2} add new_col1 int default 1;" \
"\"result\": true" 2

run_sql_source1 "alter table ${db}.${tb1} add new_col2 int;"
run_sql_source2 "alter table ${db}.${tb1} add new_col2 int;"
run_sql_source2 "alter table ${db}.${tb2} add new_col2 int;"
run_sql_source1 "insert into ${db}.${tb1} values(13,13,13);"
run_sql_source2 "insert into ${db}.${tb1} values(14,14,14);"
run_sql_source2 "insert into ${db}.${tb2} values(15,15,15);"

# WARN: some data different
# all the value before alter table in TiDB will be 1, while upstream table is 1, 2 or 3
run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 15"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
}

function DM_REPLACE_DEFAULT_VALUE() {
run_case REPLACE_DEFAULT_VALUE "double-source-optimistic" "init_table 11 21 22" "clean_table" ""
}

function run() {
init_cluster
init_database
Expand All @@ -1421,6 +1475,7 @@ function run() {
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
DM_SKIP_INCOMPATIBLE_DDL
DM_REPLACE_DEFAULT_VALUE

implement=(4202 4204 4206 4207 4209 4211 4213 4215 4216 4219 4220 4185 4201 4189 4210 4193 4230 4177 4231)
for i in ${implement[@]}; do
Expand Down

0 comments on commit 301dd99

Please sign in to comment.