Skip to content

Commit

Permalink
DM(syncer): fix lost lost dml under special sharding ddls (pingcap#5006
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 25, 2022
1 parent fb721c1 commit 865301e
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 3 deletions.
6 changes: 5 additions & 1 deletion dm/syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) (
if activeDDLItem == nil {
return true
}
return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) > 0
// this function only affects dml
// activeDDLItem.FirstLocation is ddl's startLocation
// location is dml's currentLocation
// dml should be synced when the comparation is equal
return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) >= 0
}

// UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil.
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/sharding_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *testShardingGroupSuite) TestSync(c *C) {

// active DDL is at pos21
beforeActiveDDL = g1.CheckSyncing(source2, pos21)
c.Assert(beforeActiveDDL, IsFalse)
c.Assert(beforeActiveDDL, IsTrue)

info = g1.UnresolvedGroupInfo()
sort.Strings(info.Synced)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (t *testShardingGroupSuite) TestKeeper(c *C) {
c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos11), IsFalse)
// position at/after active DDL, in syncing
c.Assert(binlog.CompareLocation(pos12, loc, false), Equals, 0)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsTrue)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsFalse)
c.Assert(binlog.CompareLocation(endPos12, loc, false), Equals, 1)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos12), IsTrue)

Expand Down
197 changes: 197 additions & 0 deletions dm/tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,201 @@ function DM_REPLACE_ERROR_SHARDING() {
"clean_table" "optimistic"
}

# two source, 4 tables
# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
function DM_CROSS_DDL_SHARDING_CASE() {
# 11/21 first ddl
run_sql_source1 "alter table ${db}.${tb1} add column c int;"
run_sql_source2 "alter table ${db}.${tb1} add column c int;"
run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);"
run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);"
run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);"

# 11/21 second ddl
run_sql_source1 "alter table ${db}.${tb1} add column d int;"
run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);"
run_sql_source2 "alter table ${db}.${tb1} add column d int;"
run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);"

# 12/22 first ddl
run_sql_source1 "alter table ${db}.${tb2} add column c int;"
run_sql_source2 "alter table ${db}.${tb2} add column c int;"
run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);"
run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);"

# 12/22 second ddl
run_sql_source1 "alter table ${db}.${tb2} add column d int;"
run_sql_source2 "alter table ${db}.${tb2} add column d int;"
run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);"
run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 4

run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10"
}

function DM_CROSS_DDL_SHARDING() {
run_case CROSS_DDL_SHARDING "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "pessimistic"

run_case CROSS_DDL_SHARDING "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "optimistic"
}

# replace add column unique twice
# two source, 4 tables
# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR_CASE() {
# 11/21 first ddl
run_sql_source1 "alter table ${db}.${tb1} add column c int unique;"
run_sql_source2 "alter table ${db}.${tb1} add column c int unique;"
run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);"
run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);"
run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);"

# 11/21 second ddl
run_sql_source1 "alter table ${db}.${tb1} add column d int unique;"
run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);"
run_sql_source2 "alter table ${db}.${tb1} add column d int unique;"
run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);"

# 12/22 first ddl
run_sql_source1 "alter table ${db}.${tb2} add column c int unique;"
run_sql_source2 "alter table ${db}.${tb2} add column c int unique;"
run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);"
run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);"

# 12/22 second ddl
run_sql_source1 "alter table ${db}.${tb2} add column d int unique;"
run_sql_source2 "alter table ${db}.${tb2} add column d int unique;"
run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);"
run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);"

# 11/21 first ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# begin to handle error
# split 11/21 first ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb1} add column c int;alter table ${db}.${tb1} add unique(c)" \
"\"result\": true" 3

if [[ "$1" = "pessimistic" ]]; then
# 11 second ddl bypass, 12 first ddl detect conflict
# 22 first ddl: detect conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence from source" 2

# split 12,22 first ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01,mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 3

# 11/21 second ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# split 11/21 second ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 3

# 12/22 second ddl: detect conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence from source" 2

# split 11/21 second ddl into two ddls one by one
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \
"\"result\": true" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \
"\"result\": true" 2
else
# 11 second ddl, 22 first ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 11 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 2

# replace 21 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 2

# 12 first ddl, 21 second ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 12 first ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 2

# replace 22 first ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 2

# 12 first ddl, 22 second ddl: unspport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 12/22 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb2} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 3

fi

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 4

run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10"
}

function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR() {
run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "pessimistic"

run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "optimistic"
}

# replace add column unique
# one source, one table, no sharding
function DM_INJECT_DDL_ERROR_CASE() {
Expand Down Expand Up @@ -880,6 +1075,8 @@ function run() {
DM_SKIP_ERROR
DM_SKIP_ERROR_SHARDING
DM_REPLACE_ERROR
DM_CROSS_DDL_SHARDING
DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR
DM_REPLACE_ERROR_SHARDING
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
Expand Down

0 comments on commit 865301e

Please sign in to comment.