Skip to content

Commit

Permalink
relay(dm): cancel when relay meet error to close goroutine (#6803) (#…
Browse files Browse the repository at this point in the history
…6813)

close #6193
  • Loading branch information
ti-chi-bot authored Aug 22, 2022
1 parent ba5ff92 commit 82a688e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 4 deletions.
1 change: 1 addition & 0 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (h *realRelayHolder) run() {
h.setResult(nil) // clear previous result

r := h.relay.Process(h.ctx)
h.cancel()

h.setResult(&r)
for _, err := range r.Errors {
Expand Down
13 changes: 9 additions & 4 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,11 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
}

// handleEvents handles binlog events, including:
// 1. read events from upstream
// 2. transform events
// 3. write events into relay log files
// 4. update metadata if needed
// 1. read events from upstream
// 2. transform events
// 3. write events into relay log files
// 4. update metadata if needed
//
// the first return value is the index of last read rows event if the transaction is not finished.
func (r *Relay) handleEvents(
ctx context.Context,
Expand All @@ -454,6 +455,10 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)

failpoint.Inject("RelayGetEventFailed", func() {
err = errors.New("RelayGetEventFailed")
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled:
Expand Down
36 changes: 36 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,38 @@ function test_restart_relay_status() {
"bound" 2
}

function test_relay_leak() {
cleanup_process
cleanup_data $TEST_NAME
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml
sed -i "/checker:/d" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"RelayGetEventFailed" 1

check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log

count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true)
if [ $count -gt 1 ]; then
echo "relay goroutine leak detected, count expect 1 but got $count"
exit 1
fi
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed"
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand Down Expand Up @@ -202,6 +234,10 @@ function run() {
test_restart_relay_status
test_cant_dail_downstream
test_cant_dail_upstream
test_relay_leak

cleanup_process
cleanup_data $TEST_NAME

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)"

Expand Down

0 comments on commit 82a688e

Please sign in to comment.