diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 4a9ca3c5c96..d0e5bee658d 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -404,6 +404,15 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error { } func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) { + // notifyAndWaitExec may return because of context cancellation, + // and s.flushSyncWg.Wait() goroutine is still running, check context first to + // avoid data race + select { + case <-ctx.Done(): + log.Warn("context is done", zap.Error(ctx.Err())) + return + default: + } s.broadcastFinishTxn() s.execWaitNotifier.Notify() done := make(chan struct{})