diff --git a/relay/relay.go b/relay/relay.go index bc22f85a4a..df6d54fdc5 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go/sync2" @@ -49,7 +50,6 @@ import ( "github.com/pingcap/dm/relay/retry" "github.com/pingcap/dm/relay/transformer" "github.com/pingcap/dm/relay/writer" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) var ( @@ -70,6 +70,8 @@ const ( // NewRelay creates an instance of Relay. var NewRelay = NewRealRelay +var _ Process = &Relay{} + // Process defines mysql-like relay log process unit type Process interface { // Init initial relat log unit @@ -204,6 +206,11 @@ func (r *Relay) process(ctx context.Context) error { return err } + failpoint.Inject("NewUpstreamServer", func(_ failpoint.Value) { + // test a bug which caused by upstream switching + isNew = true + }) + if isNew { // re-setup meta for new server or new source err = r.reSetupMeta(ctx) diff --git a/tests/all_mode/conf/source1.yaml b/tests/all_mode/conf/source1.yaml index 49b830cced..664e2509c5 100644 --- a/tests/all_mode/conf/source1.yaml +++ b/tests/all_mode/conf/source1.yaml @@ -2,7 +2,7 @@ source-id: mysql-replica-01 flavor: '' -enable-gtid: false +enable-gtid: true enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' diff --git a/tests/all_mode/data/db1.increment0.sql b/tests/all_mode/data/db1.increment0.sql new file mode 100644 index 0000000000..99e094c474 --- /dev/null +++ b/tests/all_mode/data/db1.increment0.sql @@ -0,0 +1,2 @@ +use all_mode; +insert into t1 (id, name) values (100, 'Eddard Stark'); diff --git a/tests/all_mode/data/db2.increment0.sql b/tests/all_mode/data/db2.increment0.sql new file mode 100644 index 0000000000..8ac48bc7a9 --- /dev/null +++ b/tests/all_mode/data/db2.increment0.sql @@ -0,0 +1 @@ +delete from all_mode.t2 where id = 1; diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 535c6ecdde..2e67cffc9d 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -68,14 +68,15 @@ function test_query_timeout(){ check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT # operate mysql config to worker cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 # start DM task only @@ -177,7 +178,11 @@ function run() { test_stop_task_before_checkpoint - export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + inject_points=( + "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + "github.com/pingcap/dm/relay/NewUpstreamServer=return(true)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' @@ -250,24 +255,35 @@ function run() { pkill -hup tidb-server 2>/dev/null || true wait_process_exit tidb-server - # test pause and resume relay + # dm-worker execute sql failed, and will try auto resume task + run_sql_file $cur/data/db2.increment0.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 2 + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "dispatch auto resume task" + + # restart tidb, and task will recover success + run_tidb_server 4000 $TIDB_PASSWORD + sleep 2 + + # test after pause and resume relay, relay could continue from syncer's checkpoint + run_sql_source1 "flush logs" + run_sql_file $cur/data/db1.increment0.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "pause-relay -s mysql-replica-01" \ "\"result\": true" 2 + # we used failpoint to imitate an upstream switching, which purged whole relay dir run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "resume-relay -s mysql-replica-01" \ "\"result\": true" 2 - sleep 2 - # dm-worker execute sql failed, and will try auto resume task - check_log_contains $WORK_DIR/worker1/log/dm-worker.log "dispatch auto resume task" - check_log_contains $WORK_DIR/worker2/log/dm-worker.log "dispatch auto resume task" - - # restart tidb, and task will recover success - run_tidb_server 4000 $TIDB_PASSWORD - sleep 2 + # relay should continue pulling from syncer's checkpoint, so only pull the latest binlog + server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) + relay_log_num=`ls $WORK_DIR/worker1/relay_log/$server_uuid | grep -v 'relay.meta' | wc -l` + echo "relay logs `ls $WORK_DIR/worker1/relay_log/$server_uuid`" + [ $relay_log_num -eq 1 ] # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml