diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 10a856e617..b965cf1bc7 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -83,13 +83,13 @@ func NewRootCmd() *cobra.Command { Simply type ` + cmd.Name() + ` help [path to command] for full details.`, Run: func(c *cobra.Command, args []string) { - cmd, _, e := c.Root().Find(args) - if cmd == nil || e != nil { + cmd2, _, e := c.Root().Find(args) + if cmd2 == nil || e != nil { c.Printf("Unknown help topic %#q\n", args) _ = c.Root().Usage() } else { - cmd.InitDefaultHelpFlag() // make possible 'help' flag to be shown - _ = cmd.Help() + cmd2.InitDefaultHelpFlag() // make possible 'help' flag to be shown + _ = cmd2.Help() } }, } diff --git a/pkg/shardddl/optimism/lock.go b/pkg/shardddl/optimism/lock.go index 9da2a80c17..e537591f04 100644 --- a/pkg/shardddl/optimism/lock.go +++ b/pkg/shardddl/optimism/lock.go @@ -156,18 +156,23 @@ func (l *Lock) TrySync(callerSource, callerSchema, callerTable string, prevTable := l.tables[callerSource][callerSchema][callerTable] oldJoined := l.joined - // update table info and joined info base on the last new table info lastTableInfo := schemacmp.Encode(newTIs[len(newTIs)-1]) - log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable), - zap.Stringer("from", prevTable), zap.Stringer("to", lastTableInfo), zap.Strings("ddls", ddls)) - l.tables[callerSource][callerSchema][callerTable] = lastTableInfo - lastJoined, err := joinTable(lastTableInfo) if err != nil { return emptyDDLs, err } - // update the current joined table info, it should be logged in `if cmp != 0` block below. - l.joined = lastJoined + + defer func() { + // only update table info and joined info if no error + if err == nil { + // update table info and joined info base on the last new table info + log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable), + zap.Stringer("from", l.tables[callerSource][callerSchema][callerTable]), zap.Stringer("to", lastTableInfo), zap.Strings("ddls", ddls)) + l.tables[callerSource][callerSchema][callerTable] = lastTableInfo + // update the current joined table info, it should be logged in `if cmp != 0` block below. + l.joined = lastJoined + } + }() newDDLs = []string{} nextTable := prevTable diff --git a/pkg/shardddl/optimism/lock_test.go b/pkg/shardddl/optimism/lock_test.go index bf62f6a14f..624c9cc9ad 100644 --- a/pkg/shardddl/optimism/lock_test.go +++ b/pkg/shardddl/optimism/lock_test.go @@ -874,8 +874,9 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, ErrorMatches, ".*at tuple index.*") - c.Assert(cmp, Equals, 0) + // join table isn't updated + c.Assert(err, IsNil) + c.Assert(cmp, Equals, -1) ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) @@ -885,15 +886,27 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs3) c.Assert(l.versions, DeepEquals, vers) - ready = l.Ready() - c.Assert(ready[source][db][tbls[0]], IsFalse) - c.Assert(ready[source][db][tbls[1]], IsTrue) // the second table become synced now. + ready = l.Ready() // all table ready + c.Assert(ready[source][db][tbls[0]], IsTrue) + c.Assert(ready[source][db][tbls[1]], IsTrue) cmp, err = l.tables[source][db][tbls[0]].Compare(l.Joined()) c.Assert(err, IsNil) - c.Assert(cmp, Equals, -1) + c.Assert(cmp, Equals, 0) + cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) + + // TrySync for the second table, succeed now + vers[source][db][tbls[1]]++ + DDLs, err = l.TrySync(source, db, tbls[1], DDLs2, []*model.TableInfo{ti2_1, ti2}, tts, vers[source][db][tbls[1]]) + c.Assert(err, IsNil) + c.Assert(DDLs, DeepEquals, DDLs2) + c.Assert(l.versions, DeepEquals, vers) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) c.Assert(err, IsNil) c.Assert(cmp, Equals, 0) + ready = l.Ready() + c.Assert(ready[source][db][tbls[1]], IsTrue) // TrySync for the first table. vers[source][db][tbls[0]]++ @@ -977,8 +990,9 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, ErrorMatches, ".*at tuple index.*") - c.Assert(cmp, Equals, 0) + // join table isn't updated + c.Assert(err, IsNil) + c.Assert(cmp, Equals, -1) ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) @@ -989,8 +1003,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, ErrorMatches, ".*at tuple index.*") - c.Assert(cmp, Equals, 0) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, -1) // TrySync for the second table to drop the non-conflict column, the conflict should still exist. vers[source][db][tbls[1]]++ @@ -999,8 +1013,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, ErrorMatches, ".*at tuple index.*") - c.Assert(cmp, Equals, 0) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, -1) ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) @@ -1051,8 +1065,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, ErrorMatches, ".*at tuple index.*") - c.Assert(cmp, Equals, 0) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, -1) c.Assert(l.versions, DeepEquals, vers) ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) diff --git a/tests/handle_error/run.sh b/tests/handle_error/run.sh index 27bf002a46..88e83d357e 100644 --- a/tests/handle_error/run.sh +++ b/tests/handle_error/run.sh @@ -1411,6 +1411,60 @@ function DM_SKIP_INCOMPATIBLE_DDL() { run_case SKIP_INCOMPATIBLE_DDL "single-source-no-sharding" "init_table 11" "clean_table" "" } +function DM_REPLACE_DEFAULT_VALUE_CASE() { + run_sql_source1 "insert into ${db}.${tb1} values(1);" + run_sql_source2 "insert into ${db}.${tb1} values(2);" + run_sql_source2 "insert into ${db}.${tb2} values(3);" + + run_sql_source1 "alter table ${db}.${tb1} add new_col1 int default 1;" + run_sql_source1 "insert into ${db}.${tb1} values(4,4);" + run_sql_source2 "insert into ${db}.${tb1} values(5);" + run_sql_source2 "insert into ${db}.${tb2} values(6);" + run_sql_source2 "alter table ${db}.${tb1} add new_col1 int default 2;" + run_sql_source1 "insert into ${db}.${tb1} values(7,7);" + run_sql_source2 "insert into ${db}.${tb1} values(8,8);" + run_sql_source2 "insert into ${db}.${tb2} values(9);" + run_sql_source2 "alter table ${db}.${tb2} add new_col1 int default 3;" + run_sql_source1 "insert into ${db}.${tb1} values(10,10);" + run_sql_source2 "insert into ${db}.${tb1} values(11,11);" + run_sql_source2 "insert into ${db}.${tb2} values(12,12);" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "because schema conflict detected" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "handle-error test -s mysql-replica-02 replace alter table ${db}.${tb1} add new_col1 int default 1;" \ + "\"result\": true" 2 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "because schema conflict detected" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "handle-error test -s mysql-replica-02 replace alter table ${db}.${tb2} add new_col1 int default 1;" \ + "\"result\": true" 2 + + run_sql_source1 "alter table ${db}.${tb1} add new_col2 int;" + run_sql_source2 "alter table ${db}.${tb1} add new_col2 int;" + run_sql_source2 "alter table ${db}.${tb2} add new_col2 int;" + run_sql_source1 "insert into ${db}.${tb1} values(13,13,13);" + run_sql_source2 "insert into ${db}.${tb1} values(14,14,14);" + run_sql_source2 "insert into ${db}.${tb2} values(15,15,15);" + + # WARN: some data different + # all the value before alter table in TiDB will be 1, while upstream table is 1, 2 or 3 + run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 15" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 +} + +function DM_REPLACE_DEFAULT_VALUE() { + run_case REPLACE_DEFAULT_VALUE "double-source-optimistic" "init_table 11 21 22" "clean_table" "" +} + function run() { init_cluster init_database @@ -1421,6 +1475,7 @@ function run() { DM_REPLACE_ERROR_MULTIPLE DM_EXEC_ERROR_SKIP DM_SKIP_INCOMPATIBLE_DDL + DM_REPLACE_DEFAULT_VALUE implement=(4202 4204 4206 4207 4209 4211 4213 4215 4216 4219 4220 4185 4201 4189 4210 4193 4230 4177 4231) for i in ${implement[@]}; do