diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 8589e17cd4..d5fa5f0ae1 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -259,21 +259,25 @@ func (tsc *realTaskStatusChecker) run() { // isResumableError checks the error message and returns whether we need to // resume the task and retry func isResumableError(err *pb.ProcessError) bool { + if err.Error == nil { + return true + } + switch err.Type { case pb.ErrorType_ExecSQL: // not elegant code, because TiDB doesn't expose some error for _, msg := range retry.UnsupportedDDLMsgs { - if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { return false } } for _, msg := range retry.UnsupportedDMLMsgs { - if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { return false } } case pb.ErrorType_UnknownError: - if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { + if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) { return false @@ -282,6 +286,10 @@ func isResumableError(err *pb.ProcessError) bool { } } + if _, ok := retry.UnresumableErrCodes[err.Error.ErrCode]; ok { + return false + } + return true } diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 1a70b6efb6..5525d86798 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -741,6 +741,9 @@ func (w *Worker) restoreSubTask() error { return terror.Annotatef(err, "decode subtask config %s error in restoreSubTask", task.Task) } + // when restarting DM-worker, some config items may need to be updated for the previous saved task config. + w.copyConfigFromWorker(taskCfg) + cfgDecrypted, err := taskCfg.DecryptPassword() if err != nil { return err diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index a04fb730a3..2a2e0bc713 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -15,6 +15,8 @@ package retry import ( "database/sql/driver" + + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/errors" ) @@ -41,6 +43,11 @@ var ( "binlog checksum mismatch, data may be corrupted", "get event err EOF", } + + // UnresumableErrCodes is a set of unresumeable err codes. + UnresumableErrCodes = map[int32]struct{}{ + int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {}, + } ) // IsConnectionError tells whether this error should reconnect to Database diff --git a/syncer/sharding-meta/shardmeta.go b/syncer/sharding-meta/shardmeta.go index ea8395a730..536a3b5f5c 100644 --- a/syncer/sharding-meta/shardmeta.go +++ b/syncer/sharding-meta/shardmeta.go @@ -120,7 +120,8 @@ func (meta *ShardingMeta) ActiveIdx() int { return meta.activeIdx } -func (meta *ShardingMeta) reinitialize() { +// Reinitialize reinitialize the shardingmeta +func (meta *ShardingMeta) Reinitialize() { meta.activeIdx = 0 meta.global = &ShardingSequence{make([]*DDLItem, 0)} meta.sources = make(map[string]*ShardingSequence) @@ -220,7 +221,7 @@ func (meta *ShardingMeta) InSequenceSharding() bool { func (meta *ShardingMeta) ResolveShardingDDL() bool { meta.activeIdx++ if meta.activeIdx == len(meta.global.Items) { - meta.reinitialize() + meta.Reinitialize() return true } return false diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index 0f4f1281a7..2a4e1d3e78 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -480,6 +480,9 @@ func (k *ShardingGroupKeeper) ResetGroups() { defer k.RUnlock() for _, group := range k.groups { group.Reset() + // reset ShardingMeta when start or resume task + // it will be reconstructed by consuming binlog event + group.meta.Reinitialize() } } diff --git a/tests/sequence_sharding/data/db1.increment3.sql b/tests/sequence_sharding/data/db1.increment3.sql new file mode 100644 index 0000000000..9d66b64a8c --- /dev/null +++ b/tests/sequence_sharding/data/db1.increment3.sql @@ -0,0 +1,5 @@ +use sharding_seq; +alter table t1 add column new_col1 int; +alter table t1 add column new_col2 int; +alter table t2 add column new_col1 int; +alter table t2 add column new_col3 int; diff --git a/tests/sequence_sharding/data/db2.increment3.sql b/tests/sequence_sharding/data/db2.increment3.sql new file mode 100644 index 0000000000..f366aecab3 --- /dev/null +++ b/tests/sequence_sharding/data/db2.increment3.sql @@ -0,0 +1,7 @@ +use sharding_seq; +alter table t2 add column new_col1 int; +alter table t2 add column new_col2 int; +alter table t3 add column new_col1 int; +alter table t3 add column new_col2 int; +alter table t4 add column new_col1 int; +alter table t4 add column new_col3 int; diff --git a/tests/sequence_sharding/run.sh b/tests/sequence_sharding/run.sh index 6fb5982b26..cf87cedc5c 100755 --- a/tests/sequence_sharding/run.sh +++ b/tests/sequence_sharding/run.sh @@ -62,6 +62,46 @@ function run() { "query-status sequence_sharding" \ "\"stage\": \"Running\"" 4 \ "\"unit\": \"Sync\"" 2 + + + run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + sleep 2 + + # the first ddl success while the second is conflict + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task sequence_sharding" \ + "\"result\": true" 3 + + sleep 2 + + # still conflict + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task sequence_sharding" \ + "\"result\": true" 3 + + # now upstream schema is conflict, ignore it and restart task + cp $cur/conf/dm-task.yaml $WORK_DIR/task.yaml + echo "ignore-checking-items: [\"all\"]" >> $WORK_DIR/task.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/task.yaml" \ + "\"result\": true" 3 + + sleep 2 + + # still conflict + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status sequence_sharding" \ + "detect inconsistent DDL sequence" 2 } cleanup_data sharding_target2 diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index 2a5ee8a06b..2a8266d604 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -64,6 +64,33 @@ function run() { new_checksum=$(checksum) echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum" [ "$old_checksum" == "$new_checksum" ] + + # test conflict ddl in single worker + run_sql "alter table sharding1.t1 add column new_col1 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table sharding1.t2 add column new_col2 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + + sleep 2 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence" 1 + + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test -w 127.0.0.1:8262"\ + "\"result\": true" 2 + + sleep 2 + + # still conflict + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence" 1 + + # stop twice, just used to test stop by the way + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 } cleanup_data db_target