Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl: fix resume conflict ddl. #722

Merged
merged 15 commits into from
Jun 15, 2020
13 changes: 10 additions & 3 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,31 @@ func (tsc *realTaskStatusChecker) run() {
// isResumableError checks the error message and returns whether we need to
// resume the task and retry
func isResumableError(err *pb.ProcessError) bool {
if err.Error == nil {
return true
}

// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if err.Error != nil && strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
}
}
}
if _, ok := retry.UnresumableErrCodes[err.Error.ErrCode]; ok {
return false
}

return true
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package retry

import (
"database/sql/driver"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/errors"
)

Expand All @@ -41,6 +43,11 @@ var (
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}

// UnresumableErrCodes is a set of unresumeable err codes.
UnresumableErrCodes = map[int32]struct{}{
int32(terror.ErrSyncerShardDDLConflict.Code()): {},
}
)

// IsConnectionError tells whether this error should reconnect to Database
Expand Down
5 changes: 3 additions & 2 deletions syncer/sharding-meta/shardmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (meta *ShardingMeta) ActiveIdx() int {
return meta.activeIdx
}

func (meta *ShardingMeta) reinitialize() {
// Reinitialize reinitialize the shardingmeta
func (meta *ShardingMeta) Reinitialize() {
meta.activeIdx = 0
meta.global = &ShardingSequence{make([]*DDLItem, 0)}
meta.sources = make(map[string]*ShardingSequence)
Expand Down Expand Up @@ -247,7 +248,7 @@ func (meta *ShardingMeta) InSequenceSharding() bool {
func (meta *ShardingMeta) ResolveShardingDDL() bool {
meta.activeIdx++
if meta.activeIdx == len(meta.global.Items) {
meta.reinitialize()
meta.Reinitialize()
return true
}
return false
Expand Down
1 change: 1 addition & 0 deletions syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (k *ShardingGroupKeeper) ResetGroups() {
defer k.RUnlock()
for _, group := range k.groups {
group.Reset()
group.meta.Reinitialize()
}
}

Expand Down
5 changes: 5 additions & 0 deletions tests/sequence_sharding/data/db1.increment2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use sharding_seq;
alter table t1 add column new_col1 int;
alter table t1 add column new_col2 int;
alter table t2 add column new_col1 int;
alter table t2 add column new_col3 int;
7 changes: 7 additions & 0 deletions tests/sequence_sharding/data/db2.increment2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use sharding_seq;
alter table t2 add column new_col1 int;
alter table t2 add column new_col2 int;
alter table t3 add column new_col1 int;
alter table t3 add column new_col2 int;
alter table t4 add column new_col1 int;
alter table t4 add column new_col3 int;
32 changes: 32 additions & 0 deletions tests/sequence_sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,38 @@ function run() {
sleep 3
# use sync_diff_inspector to check data now!
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

# the first ddl success while the second is conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task sequence_sharding" \
"\"result\": true" 3

# still conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task sequence_sharding" \
"\"result\": true" 3

cp $cur/conf/dm-task.yaml $WORK_DIR/task.yaml
echo "ignore-checking-items: [\"all\"]" >> $WORK_DIR/task.yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to ignore all checks? how about adding a comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment in 644a504

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/task.yaml" \
"\"result\": true" 3

# still conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2
}

cleanup_data sharding_target2
Expand Down
17 changes: 17 additions & 0 deletions tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ function run() {
echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum"
[ "$old_checksum" == "$new_checksum" ]

# test conflict ddl in single worker
run_sql "alter table sharding1.t1 add column new_col1 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "alter table sharding1.t2 add column new_col2 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"\
"\"result\": true" 3

# still conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1

# stop twice, just used to test stop by the way
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
Expand Down