Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay(dm): cancel when relay meet error to close goroutine (#6803) #6813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (h *realRelayHolder) run() {
h.setResult(nil) // clear previous result

r := h.relay.Process(h.ctx)
h.cancel()

h.setResult(&r)
for _, err := range r.Errors {
Expand Down
13 changes: 9 additions & 4 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,11 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
}

// handleEvents handles binlog events, including:
// 1. read events from upstream
// 2. transform events
// 3. write events into relay log files
// 4. update metadata if needed
// 1. read events from upstream
// 2. transform events
// 3. write events into relay log files
// 4. update metadata if needed
//
// the first return value is the index of last read rows event if the transaction is not finished.
func (r *Relay) handleEvents(
ctx context.Context,
Expand All @@ -454,6 +455,10 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)

failpoint.Inject("RelayGetEventFailed", func() {
err = errors.New("RelayGetEventFailed")
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled:
Expand Down
36 changes: 36 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,38 @@ function test_restart_relay_status() {
"bound" 2
}

function test_relay_leak() {
cleanup_process
cleanup_data $TEST_NAME
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml
sed -i "/checker:/d" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"RelayGetEventFailed" 1

check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log

count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true)
if [ $count -gt 1 ]; then
echo "relay goroutine leak detected, count expect 1 but got $count"
exit 1
fi
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed"
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand Down Expand Up @@ -202,6 +234,10 @@ function run() {
test_restart_relay_status
test_cant_dail_downstream
test_cant_dail_upstream
test_relay_leak

cleanup_process
cleanup_data $TEST_NAME

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)"

Expand Down