Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

shardddl: fix resume conflict ddl. #739

Merged
merged 4 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions syncer/sharding-meta/shardmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (meta *ShardingMeta) ActiveIdx() int {
return meta.activeIdx
}

func (meta *ShardingMeta) reinitialize() {
// Reinitialize reinitialize the shardingmeta
func (meta *ShardingMeta) Reinitialize() {
meta.activeIdx = 0
meta.global = &ShardingSequence{make([]*DDLItem, 0)}
meta.sources = make(map[string]*ShardingSequence)
Expand Down Expand Up @@ -220,7 +221,7 @@ func (meta *ShardingMeta) InSequenceSharding() bool {
func (meta *ShardingMeta) ResolveShardingDDL() bool {
meta.activeIdx++
if meta.activeIdx == len(meta.global.Items) {
meta.reinitialize()
meta.Reinitialize()
return true
}
return false
Expand Down
3 changes: 3 additions & 0 deletions syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ func (k *ShardingGroupKeeper) ResetGroups() {
defer k.RUnlock()
for _, group := range k.groups {
group.Reset()
// reset ShardingMeta when start or resume task
// it will be reconstructed by consuming binlog event
group.meta.Reinitialize()
}
}

Expand Down
1 change: 0 additions & 1 deletion tests/sequence_sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove this line? for the status checking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It throw a error heartbeat config is different from previous used: serverID not equal when restart task. 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe our test case has a problem before? I run it locally, but got another error

"msg": "[code=38008:class=dm-master:scope=internal:level=high] grpc request error: rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp 127.0.0.1:8262: connect: connection refused\"\ngithub.com/pingcap/dm/pkg/terror.(*Error).Delegate\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/pkg/terror/terror.go:267\ngithub.com/pingcap/dm/dm/master/workerrpc.callRPC\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/dm/master/workerrpc/rawgrpc.go:124\ngithub.com/pingcap/dm/dm/master/workerrpc.(*GRPCClient).SendRequest\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/dm/master/workerrpc/rawgrpc.go:64\ngithub.com/pingcap/dm/dm/master.(*Server).getStatusFromWorkers.func2\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/dm/master/server.go:1135\ngithub.com/pingcap/dm/dm/master.(*AgentPool).Emit\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/dm/master/agent_pool.go:117\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1357"
[2020/06/15 21:27:04.989 +08:00] [INFO] [mode.go:99] ["change count"] [task=sequence_sharding] [unit="binlog replication"] ["previous count"=1] ["new count"=2]
[2020/06/15 21:27:04.989 +08:00] [INFO] [syncer.go:1836] ["save table checkpoint for source"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t1`] ["start position"="(mysql-bin|000001.000001, 12499)"] ["end position"="(mysql-bin|000001.000001, 12617)"]
[2020/06/15 21:27:04.989 +08:00] [INFO] [syncer.go:1839] ["source shard group is not synced"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t1`] ["start position"="(mysql-bin|000001.000001, 12499)"] ["end position"="(mysql-bin|000001.000001, 12617)"]
[2020/06/15 21:27:04.990 +08:00] [INFO] [syncer.go:1618] [task=sequence_sharding] [unit="binlog replication"] [event=query] [statement="alter table t2 drop column f"] [schema=sharding_seq] ["last position"="(mysql-bin|000001.000001, 12617)"] [position="(mysql-bin|000001.000001, 12800)"] ["gtid set"=NULL]
[2020/06/15 21:27:04.990 +08:00] [INFO] [syncer.go:1634] ["resolve sql"] [task=sequence_sharding] [unit="binlog replication"] [event=query] ["raw statement"="alter table t2 drop column f"] [statements="[\"ALTER TABLE `sharding_seq`.`t2` DROP COLUMN `f`\"]"] [schema=sharding_seq] ["last position"="(mysql-bin|000001.000001, 12800)"] [position="(mysql-bin|000001.000001, 12800)"] ["gtid set"=NULL]
[2020/06/15 21:27:04.991 +08:00] [INFO] [syncer.go:1721] ["prepare to handle ddls"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [ddls="[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]"] ["raw statement"="alter table t2 drop column f"] [position="(mysql-bin|000001.000001, 12800)"]
[2020/06/15 21:27:04.991 +08:00] [INFO] [syncer.go:2157] ["flush all jobs"] [task=sequence_sharding] [unit="binlog replication"] ["global checkpoint"="(mysql-bin|000001.000001, 12434)(flushed (mysql-bin|000001.000001, 12434))"]
[2020/06/15 21:27:04.992 +08:00] [INFO] [syncer.go:875] ["flush checkpoints except for these tables"] [task=sequence_sharding] [unit="binlog replication"] [tables="[[\"sharding_seq\",\"t1\"],[\"sharding_seq\",\"t2\"]]"]
[2020/06/15 21:27:04.992 +08:00] [INFO] [syncer.go:878] ["prepare flush sqls"] [task=sequence_sharding] [unit="binlog replication"] ["shard meta sqls"="[]"] ["shard meta arguments"="[]"]
[2020/06/15 21:27:04.993 +08:00] [INFO] [syncer.go:887] ["flushed checkpoint"] [task=sequence_sharding] [unit="binlog replication"] [checkpoint="(mysql-bin|000001.000001, 12434)(flushed (mysql-bin|000001.000001, 12434))"]
[2020/06/15 21:27:04.993 +08:00] [INFO] [relay.go:113] ["current earliest active relay log"] [task=sequence_sharding] [unit="binlog replication"] ["active relay log"=661372f8-af0a-11ea-926b-0242ac120003.000001/mysql-bin.000001]
[2020/06/15 21:27:04.993 +08:00] [INFO] [syncer.go:1823] ["try to sync table in shard group"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t2`] [ddls="[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]"] ["raw statement"="alter table t2 drop column f"] [in-sharding=true] ["start position"="(mysql-bin|000001.000001, 12682)"] [is-synced=true] [unsynced=0]
[2020/06/15 21:27:04.993 +08:00] [INFO] [syncer.go:1836] ["save table checkpoint for source"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t2`] ["start position"="(mysql-bin|000001.000001, 12682)"] ["end position"="(mysql-bin|000001.000001, 12800)"]
[2020/06/15 21:27:04.993 +08:00] [INFO] [syncer.go:1843] ["source shard group is synced"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t2`] ["start position"="(mysql-bin|000001.000001, 12682)"] ["end position"="(mysql-bin|000001.000001, 12800)"]
[2020/06/15 21:27:04.993 +08:00] [INFO] [mode.go:99] ["change count"] [task=sequence_sharding] [unit="binlog replication"] ["previous count"=2] ["new count"=1]
[2020/06/15 21:27:06.894 +08:00] [INFO] [server.go:252] [request=QueryStatus] [payload=]
[2020/06/15 21:27:06.961 +08:00] [INFO] [printer.go:54] ["Welcome to dm-worker"] ["Release Version"=None] ["Git Commit Hash"=None] ["Git Branch"=None] ["UTC Build Time"=None] ["Go Version"=None]
[2020/06/15 21:27:06.961 +08:00] [INFO] [main.go:58] ["dm-worker config"="{\"log-level\":\"info\",\"log-file\":\"/tmp/dm_test/sequence_sharding/worker1/log/dm-worker.log\",\"log-rotate\":\"\",\"worker-addr\":\":8262\",\"enable-gtid\":false,\"auto-fix-gtid\":false,\"relay-dir\":\"/tmp/dm_test/sequence_sharding/worker1/relay_log\",\"meta-dir\":\"./dm_worker_meta\",\"server-id\":429595703,\"flavor\":\"mysql\",\"charset\":\"\",\"relay-binlog-name\":\"\",\"relay-binlog-gtid\":\"\",\"source-id\":\"mysql-replica-01\",\"from\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"max-allowed-packet\":67108864,\"session\":null},\"purge\":{\"interval\":3600,\"expires\":0,\"remain-space\":15},\"checker\":{\"check-enable\":true,\"backoff-rollback\":{\"duration\":\"5m0s\"},\"backoff-max\":{\"duration\":\"5m0s\"}},\"tracer\":{\"enable\":false,\"source\":\"mysql-replica-01\",\"tracer-addr\":\"\",\"batch-size\":20,\"checksum\":false},\"config-file\":\"/home/zhangxc/gopath/src/github.com/pingcap/dm/tests/sequence_sharding/conf/dm-worker1.toml\"}"]
[2020/06/15 21:27:06.963 +08:00] [ERROR] [main.go:78] ["fail to start dm-worker"] [error="[code=40048:class=dm-worker:scope=internal:level=high] start server: listen tcp :8262: bind: address already in use"] [errorVerbose="[code=40048:class=dm-worker:scope=internal:level=high] start server: listen tcp :8262: bind: address already in use\ngithub.com/pingcap/dm/pkg/terror.(*Error).Delegate\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/pkg/terror/terror.go:267\ngithub.com/pingcap/dm/dm/worker.(*Server).Start\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/dm/worker/server.go:75\ngithub.com/pingcap/dm/cmd/dm-worker.main\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/cmd/dm-worker/main.go:76\ngithub.com/pingcap/dm/cmd/dm-worker.TestRunMain.func3\n\t/home/zhangxc/gopath/src/github.com/pingcap/dm/cmd/dm-worker/main_test.go:64\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1357"]
[2020/06/15 21:27:06.964 +08:00] [INFO] [main.go:81] ["dm-worker exit"]
[2020/06/15 21:27:10.807 +08:00] [INFO] [server.go:284] [request=FetchDDLInfo]
[2020/06/15 21:27:10.808 +08:00] [INFO] [worker.go:435] ["save DDLInfo into subTasks"] [component="worker controller"]
[2020/06/15 21:27:10.808 +08:00] [INFO] [server.go:292] [request=FetchDDLInfo] ["ddl info"="task:\"sequence_sharding\" schema:\"sharding_target2\" table:\"t_target\" DDLs:\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\" "]
[2020/06/15 21:27:10.812 +08:00] [INFO] [server.go:309] ["receive DDLLockInfo"] [request=FetchDDLInfo] ["ddl lock info"="task:\"sequence_sharding\" ID:\"sequence_sharding-`sharding_target2`.`t_target`\" "]
[2020/06/15 21:27:10.851 +08:00] [INFO] [server.go:324] [request=ExecuteDDL] [payload="task:\"sequence_sharding\" lockID:\"sequence_sharding-`sharding_target2`.`t_target`\" traceGID:\"resolveDDLLock.7\" DDLs:\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\" "]
[2020/06/15 21:27:10.853 +08:00] [INFO] [syncer.go:1923] ["ignore DDL job"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [source=`sharding_seq`.`t2`] [ddls="[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]"] ["raw statement"="alter table t2 drop column f"] ["start position"="(mysql-bin|000001.000001, 12682)"] ["end position"="(mysql-bin|000001.000001, 12800)"] [request="{\"task\":\"sequence_sharding\",\"lockID\":\"sequence_sharding-`sharding_target2`.`t_target`\",\"traceGID\":\"resolveDDLLock.7\",\"DDLs\":[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]}"]
[2020/06/15 21:27:10.854 +08:00] [INFO] [syncer.go:1927] ["start to handle ddls in shard mode"] [task=sequence_sharding] [unit="binlog replication"] [event=query] [ddls="[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]"] ["raw statement"="alter table t2 drop column f"] ["start position"="(mysql-bin|000001.000001, 12682)"] ["end position"="(mysql-bin|000001.000001, 12800)"]
[2020/06/15 21:27:10.855 +08:00] [INFO] [syncer.go:909] ["ignore sharding DDLs"] [task=sequence_sharding] [unit="binlog replication"] [ddls="[\"ALTER TABLE `sharding_target2`.`t_target` DROP COLUMN `f`\"]"]
[2020/06/15 21:27:10.855 +08:00] [INFO] [worker.go:475] ["ExecuteDDL remove cacheDDLInfo"] [component="worker controller"]
[2020/06/15 21:27:10.855 +08:00] [WARN] [syncer.go:799] ["exit triggered"] [task=sequence_sharding] [unit="binlog replication"] [failpoint=ExitAfterDDLBeforeFlush]
[2020/06/15 21:27:10.856 +08:00] [INFO] [main_test.go:56] ["os exits"] ["exit code"=1]

It seems the DM-worker is trying to start before the previous existed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above error may be a problem in my local env, I'll debug it tomorrow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet you have no python in your env. 😂

Copy link
Member

@csuzhangxc csuzhangxc Jun 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right.... I'm on another machine.. we may need to check Python exists when running test cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we start-task, restart worker, work's server-id is changed, and then restart task, task's heartbeat server-id is changed with the new worker's server-id, then heartbeat config is different

Copy link
Member

@csuzhangxc csuzhangxc Jun 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's because we saved the previous task's config (and the server_id) in the dm_worker_meta? if so, we may need to fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I open an issue #740, and will try to fix it in v1.0.6.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a commit (e477779) in this PR, @GMHDBJD PTAL

timezone: "Asia/Shanghai"

target-database:
Expand Down
5 changes: 5 additions & 0 deletions tests/sequence_sharding/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use sharding_seq;
alter table t1 add column new_col1 int;
alter table t1 add column new_col2 int;
alter table t2 add column new_col1 int;
alter table t2 add column new_col3 int;
7 changes: 7 additions & 0 deletions tests/sequence_sharding/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use sharding_seq;
alter table t2 add column new_col1 int;
alter table t2 add column new_col2 int;
alter table t3 add column new_col1 int;
alter table t3 add column new_col2 int;
alter table t4 add column new_col1 int;
alter table t4 add column new_col3 int;
40 changes: 40 additions & 0 deletions tests/sequence_sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ function run() {
"query-status sequence_sharding" \
"\"stage\": \"Running\"" 4 \
"\"unit\": \"Sync\"" 2


run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

sleep 2

# the first ddl success while the second is conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task sequence_sharding" \
"\"result\": true" 3

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task sequence_sharding" \
"\"result\": true" 3

# now upstream schema is conflict, ignore it and restart task
cp $cur/conf/dm-task.yaml $WORK_DIR/task.yaml
echo "ignore-checking-items: [\"all\"]" >> $WORK_DIR/task.yaml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/task.yaml" \
"\"result\": true" 3

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status sequence_sharding" \
"detect inconsistent DDL sequence" 2
}

cleanup_data sharding_target2
Expand Down
27 changes: 27 additions & 0 deletions tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@ function run() {
new_checksum=$(checksum)
echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum"
[ "$old_checksum" == "$new_checksum" ]

# test conflict ddl in single worker
run_sql "alter table sharding1.t1 add column new_col1 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "alter table sharding1.t2 add column new_col2 int;" $MYSQL_PORT1 $MYSQL_PASSWORD1

sleep 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1


run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test -w 127.0.0.1:8262"\
"\"result\": true" 2

sleep 2

# still conflict
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence" 1

# stop twice, just used to test stop by the way
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3
}

cleanup_data db_target
Expand Down