diff --git a/dm/dm/worker/relay.go b/dm/dm/worker/relay.go index 6fd3655ba7b..b548fe411e9 100644 --- a/dm/dm/worker/relay.go +++ b/dm/dm/worker/relay.go @@ -141,6 +141,7 @@ func (h *realRelayHolder) run() { h.setResult(nil) // clear previous result r := h.relay.Process(h.ctx) + h.cancel() h.setResult(&r) for _, err := range r.Errors { diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 5b30835ad60..55b2be4ed6b 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -427,10 +427,11 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser } // handleEvents handles binlog events, including: -// 1. read events from upstream -// 2. transform events -// 3. write events into relay log files -// 4. update metadata if needed +// 1. read events from upstream +// 2. transform events +// 3. write events into relay log files +// 4. update metadata if needed +// // the first return value is the index of last read rows event if the transaction is not finished. func (r *Relay) handleEvents( ctx context.Context, @@ -454,6 +455,10 @@ func (r *Relay) handleEvents( // 1. read events from upstream server readTimer := time.Now() rResult, err := reader2.GetEvent(ctx) + + failpoint.Inject("RelayGetEventFailed", func() { + err = errors.New("RelayGetEventFailed") + }) if err != nil { switch errors.Cause(err) { case context.Canceled: diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index f4162ba1db4..5fa0ffa3ce8 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -159,6 +159,38 @@ function test_restart_relay_status() { "bound" 2 } +function test_relay_leak() { + cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml + sed -i "/checker:/d" $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "RelayGetEventFailed" 1 + + check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log + + count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true) + if [ $count -gt 1 ]; then + echo "relay goroutine leak detected, count expect 1 but got $count" + exit 1 + fi + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed" +} + function test_kill_dump_connection() { cleanup_data $TEST_NAME cleanup_process @@ -202,6 +234,10 @@ function run() { test_restart_relay_status test_cant_dail_downstream test_cant_dail_upstream + test_relay_leak + + cleanup_process + cleanup_data $TEST_NAME export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)"