diff --git a/syncer/syncer.go b/syncer/syncer.go index 0faf3bdc7c..447959e4f7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1018,6 +1018,18 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if len(jobs) == 0 { return 0, nil } + + select { + case <-tctx.Ctx.Done(): + // do not execute queries anymore, because they should be failed with a done context. + // and avoid some errors like: + // - `driver: bad connection` for `BEGIN` + // - `sql: connection is already closed` for `BEGIN` + tctx.L().Info("skip some remaining DML jobs in the job chan because the context is done", zap.Int("count", len(jobs))) + return 0, tctx.Ctx.Err() // return the error to trigger `fatalF`. + default: + } + queries := make([]string, 0, len(jobs)) args := make([][]interface{}, 0, len(jobs)) for _, j := range jobs { diff --git a/tests/incremental_mode/run.sh b/tests/incremental_mode/run.sh index 6667bb91dd..70b7b07dc7 100755 --- a/tests/incremental_mode/run.sh +++ b/tests/incremental_mode/run.sh @@ -115,6 +115,9 @@ function run() { # meeting an error of context cancel. # when below check pass, it means we filter out that error, or that error doesn't happen. # we only focus on fails, to find any unfiltered context cancel error. + # and should not contain errors like: + # - `driver: bad connection` + # - `sql: connection is already closed` run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "pause-task test" \ "\"result\": true" 3