diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 00fe67fd70b..7b7dcb2557a 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -1780,6 +1780,16 @@ func (s *Scheduler) handleWorkerOnline(ev ha.WorkerEvent, toLock bool) error { // 3. change the stage (from Offline) to Free or Relay. lastRelaySource := w.RelaySourceID() + if lastRelaySource == "" { + // when worker is removed (for example lost keepalive when master scheduler boots up), w.RelaySourceID() is + // of course nothing, so we find the relay source from a better place + for source, workerM := range s.relayWorkers { + if _, ok2 := workerM[w.BaseInfo().Name]; ok2 { + lastRelaySource = source + break + } + } + } w.ToFree() // TODO: rename ToFree to Online and move below logic inside it if lastRelaySource != "" { diff --git a/dm/tests/new_relay/conf/source2.yaml b/dm/tests/new_relay/conf/source2.yaml new file mode 100644 index 00000000000..6c272e728c1 --- /dev/null +++ b/dm/tests/new_relay/conf/source2.yaml @@ -0,0 +1,14 @@ +source-id: mysql-replica-02 +server-id: 123456 +flavor: 'mysql' +enable-gtid: true +relay-binlog-name: '' +relay-binlog-gtid: '' +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 +checker: + check-enable: false diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 92e6b1053b8..42f0fda61ce 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -43,6 +43,122 @@ function test_cant_dail_upstream() { cleanup_data $TEST_NAME } +function test_cant_dail_downstream() { + cleanup_data $TEST_NAME + cleanup_process + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 1 + dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta" + + kill_dm_worker + # kill tidb + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # make sure DM-worker doesn't exit + sleep 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"relayCatchUpMaster\": true" 1 \ + "dial tcp 127.0.0.1:4000: connect: connection refused" 1 + + # restart tidb + run_tidb_server 4000 $TIDB_PASSWORD + sleep 2 + + cleanup_process + cleanup_data $TEST_NAME +} + +function test_restart_relay_status() { + cleanup_data $TEST_NAME + cleanup_process + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 2 \ + "\"worker\": \"worker1\"" 1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + dmctl_operate_source create $cur/conf/source2.yaml $SOURCE_ID2 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID2 worker2" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID2" \ + "\"result\": true" 2 \ + "\"worker\": \"worker2\"" 1 + + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID2 worker3" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID2" \ + "\"result\": true" 3 \ + "\"worker\": \"worker2\"" 1 \ + "\"worker\": \"worker3\"" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -n worker3" \ + "relay" 1 + + kill_dm_worker + kill_dm_master + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 2 \ + "\"worker\": \"worker1\"" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID2" \ + "\"result\": true" 3 \ + "\"worker\": \"worker2\"" 1 \ + "\"worker\": \"worker3\"" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member --worker" \ + "relay" 1 \ + "bound" 2 +} + function test_kill_dump_connection() { cleanup_data $TEST_NAME cleanup_process @@ -68,7 +184,7 @@ function test_kill_dump_connection() { "\"worker\": \"worker1\"" 1 run_sql_source1 "show processlist" - # kill dumop connection to test wheather relay will auto reconnect db + # kill dump connection to test whether relay will auto reconnect db dump_conn_id=$(cat $TEST_DIR/sql_res.$TEST_NAME.txt | grep Binlog -B 4 | grep Id | cut -d : -f2) run_sql_source1 "kill ${dump_conn_id}" @@ -83,6 +199,8 @@ function test_kill_dump_connection() { } function run() { + test_restart_relay_status + test_cant_dail_downstream test_cant_dail_upstream export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)"