diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 61417d029f..3f8e03558b 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -241,24 +241,31 @@ func (tsc *realTaskStatusChecker) run() { // isResumableError checks the error message and returns whether we need to // resume the task and retry func isResumableError(err *pb.ProcessError) bool { + if err.Error == nil { + return true + } + // not elegant code, because TiDB doesn't expose some error for _, msg := range retry.UnsupportedDDLMsgs { - if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { return false } } for _, msg := range retry.UnsupportedDMLMsgs { - if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { return false } } - if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { + if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) { return false } } } + if _, ok := retry.UnresumableErrCodes[err.Error.ErrCode]; ok { + return false + } return true } diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index a04fb730a3..b2450979c4 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -15,6 +15,8 @@ package retry import ( "database/sql/driver" + + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/errors" ) @@ -41,6 +43,11 @@ var ( "binlog checksum mismatch, data may be corrupted", "get event err EOF", } + + // UnresumableErrCodes is a set of unresumeable err codes. + UnresumableErrCodes = map[int32]struct{}{ + int32(terror.ErrSyncerShardDDLConflict.Code()): {}, + } ) // IsConnectionError tells whether this error should reconnect to Database diff --git a/syncer/sharding-meta/shardmeta.go b/syncer/sharding-meta/shardmeta.go index e639398eaf..dfcfe1110f 100644 --- a/syncer/sharding-meta/shardmeta.go +++ b/syncer/sharding-meta/shardmeta.go @@ -147,7 +147,8 @@ func (meta *ShardingMeta) ActiveIdx() int { return meta.activeIdx } -func (meta *ShardingMeta) reinitialize() { +// Reinitialize reinitialize the shardingmeta +func (meta *ShardingMeta) Reinitialize() { meta.activeIdx = 0 meta.global = &ShardingSequence{make([]*DDLItem, 0)} meta.sources = make(map[string]*ShardingSequence) @@ -247,7 +248,7 @@ func (meta *ShardingMeta) InSequenceSharding() bool { func (meta *ShardingMeta) ResolveShardingDDL() bool { meta.activeIdx++ if meta.activeIdx == len(meta.global.Items) { - meta.reinitialize() + meta.Reinitialize() return true } return false diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index 200a0336fa..7f7b33b6d9 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -480,6 +480,9 @@ func (k *ShardingGroupKeeper) ResetGroups() { defer k.RUnlock() for _, group := range k.groups { group.Reset() + // reset ShardingMeta when start or resume task + // it will be reconstructed by consuming binlog event + group.meta.Reinitialize() } } diff --git a/tests/sequence_sharding/data/db1.increment2.sql b/tests/sequence_sharding/data/db1.increment2.sql new file mode 100644 index 0000000000..9d66b64a8c --- /dev/null +++ b/tests/sequence_sharding/data/db1.increment2.sql @@ -0,0 +1,5 @@ +use sharding_seq; +alter table t1 add column new_col1 int; +alter table t1 add column new_col2 int; +alter table t2 add column new_col1 int; +alter table t2 add column new_col3 int; diff --git a/tests/sequence_sharding/data/db2.increment2.sql b/tests/sequence_sharding/data/db2.increment2.sql new file mode 100644 index 0000000000..f366aecab3 --- /dev/null +++ b/tests/sequence_sharding/data/db2.increment2.sql @@ -0,0 +1,7 @@ +use sharding_seq; +alter table t2 add column new_col1 int; +alter table t2 add column new_col2 int; +alter table t3 add column new_col1 int; +alter table t3 add column new_col2 int; +alter table t4 add column new_col1 int; +alter table t4 add column new_col3 int; diff --git a/tests/sequence_sharding/run.sh b/tests/sequence_sharding/run.sh index bb4ef780a2..a93b678e37 100755 --- a/tests/sequence_sharding/run.sh +++ b/tests/sequence_sharding/run.sh @@ -36,6 +36,39 @@ function run() { sleep 3 # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + # the first ddl success while the second is conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task sequence_sharding" \ + "\"result\": true" 3 + + # still conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task sequence_sharding" \ + "\"result\": true" 3 + + # now upstream schema is conflict, ignore it and restart task + cp $cur/conf/dm-task.yaml $WORK_DIR/task.yaml + echo "ignore-checking-items: [\"all\"]" >> $WORK_DIR/task.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/task.yaml" \ + "\"result\": true" 3 + + # still conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 } cleanup_data sharding_target2 diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index a4a8e76a0f..4519ce431b 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -115,6 +115,23 @@ function run() { echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum" [ "$old_checksum" == "$new_checksum" ] + # test conflict ddl in single worker + run_sql "alter table sharding1.t1 add column new_col1 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table sharding1.t2 add column new_col2 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test"\ + "\"result\": true" 3 + + # still conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence" 1 + # stop twice, just used to test stop by the way run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test"\