Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl: fix resume conflict ddl. #739

Merged
merged 4 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,21 +259,25 @@ func (tsc *realTaskStatusChecker) run() {
// isResumableError checks the error message and returns whether we need to
// resume the task and retry
func isResumableError(err *pb.ProcessError) bool {
if err.Error == nil {
return true
}

switch err.Type {
case pb.ErrorType_ExecSQL:
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
case pb.ErrorType_UnknownError:
if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
Expand All @@ -282,6 +286,10 @@ func isResumableError(err *pb.ProcessError) bool {
}
}

if _, ok := retry.UnresumableErrCodes[err.Error.ErrCode]; ok {
return false
}

return true
}

Expand Down
3 changes: 3 additions & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,9 @@ func (w *Worker) restoreSubTask() error {
return terror.Annotatef(err, "decode subtask config %s error in restoreSubTask", task.Task)
}

// when restarting DM-worker, some config items may need to be updated for the previous saved task config.
w.copyConfigFromWorker(taskCfg)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved

cfgDecrypted, err := taskCfg.DecryptPassword()
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package retry

import (
"database/sql/driver"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/errors"
)

Expand All @@ -41,6 +43,11 @@ var (
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}

// UnresumableErrCodes is a set of unresumeable err codes.
UnresumableErrCodes = map[int32]struct{}{
int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {},
}
)

// IsConnectionError tells whether this error should reconnect to Database
Expand Down
5 changes: 3 additions & 2 deletions syncer/sharding-meta/shardmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (meta *ShardingMeta) ActiveIdx() int {
return meta.activeIdx
}

func (meta *ShardingMeta) reinitialize() {
// Reinitialize reinitialize the shardingmeta
func (meta *ShardingMeta) Reinitialize() {
meta.activeIdx = 0
meta.global = &ShardingSequence{make([]*DDLItem, 0)}
meta.sources = make(map[string]*ShardingSequence)
Expand Down Expand Up @@ -220,7 +221,7 @@ func (meta *ShardingMeta) InSequenceSharding() bool {
func (meta *ShardingMeta) ResolveShardingDDL() bool {
meta.activeIdx++
if meta.activeIdx == len(meta.global.Items) {
meta.reinitialize()
meta.Reinitialize()
return true
}
return false
Expand Down
3 changes: 3 additions & 0 deletions syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ func (k *ShardingGroupKeeper) ResetGroups() {
defer k.RUnlock()
for _, group := range k.groups {
group.Reset()
// reset ShardingMeta when start or resume task
// it will be reconstructed by consuming binlog event
group.meta.Reinitialize()
}
}

Expand Down
5 changes: 5 additions & 0 deletions tests/sequence_sharding/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use sharding_seq;
alter table t1 add column new_col1 int;
alter table t1 add column new_col2 int;
alter table t2 add column new_col1 int;
alter table t2 add column new_col3 int;
7 changes: 7 additions & 0 deletions tests/sequence_sharding/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use sharding_seq;
alter table t2 add column new_col1 int;
alter table t2 add column new_col2 int;
alter table t3 add column new_col1 int;
alter table t3 add column new_col2 int;
alter table t4 add column new_col1 int;
alter table t4 add column new_col3 int;
40 changes: 40 additions & 0 deletions tests/sequence_sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ function run() {
"query-status sequence_sharding" \
"\"stage\": \"Running\"" 4 \
"\"unit\": \"Sync\"" 2


run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

sleep 2

# the first ddl success while the second is conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task sequence_sharding" \
"\"result\": true" 3

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task sequence_sharding" \
"\"result\": true" 3

# now upstream schema is conflict, ignore it and restart task
cp $cur/conf/dm-task.yaml $WORK_DIR/task.yaml
echo "ignore-checking-items: [\"all\"]" >> $WORK_DIR/task.yaml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/task.yaml" \
"\"result\": true" 3

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2
}

cleanup_data sharding_target2
Expand Down
27 changes: 27 additions & 0 deletions tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@ function run() {
new_checksum=$(checksum)
echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum"
[ "$old_checksum" == "$new_checksum" ]

# test conflict ddl in single worker
run_sql "alter table sharding1.t1 add column new_col1 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "alter table sharding1.t2 add column new_col2 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1

sleep 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1


run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test -w 127.0.0.1:8262"\
"\"result\": true" 2

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1

# stop twice, just used to test stop by the way
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3
}

cleanup_data db_target
Expand Down