From 2d9a053edbb29ddf8a94fdf6e7ce4ab2fd77f57d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 19 Feb 2024 17:28:56 +0800 Subject: [PATCH] kvclient(ticdc): fix the workerpool data race (#10196) (#10201) ref pingcap/tiflow#10095 --- cdc/kv/region_worker.go | 5 +++++ cdc/server/server.go | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 3934f33caa4..01d425558de 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -41,6 +41,7 @@ import ( var ( regionWorkerPool workerpool.WorkerPool workerPoolOnce sync.Once + workerPoolLock sync.Mutex // The magic number here is keep the same with some magic numbers in some // other components in TiCDC, including worker pool task chan size, mounter // chan size etc. @@ -412,6 +413,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv func (w *regionWorker) initPoolHandles() { handles := make([]workerpool.EventHandle, 0, w.concurrency) + workerPoolLock.Lock() + defer workerPoolLock.Unlock() for i := 0; i < w.concurrency; i++ { poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*regionStatefulEvent) @@ -867,6 +870,8 @@ func getWorkerPoolSize() (size int) { func InitWorkerPool() { workerPoolOnce.Do(func() { size := getWorkerPoolSize() + workerPoolLock.Lock() + defer workerPoolLock.Unlock() regionWorkerPool = workerpool.NewDefaultWorkerPool(size) }) } diff --git a/cdc/server/server.go b/cdc/server/server.go index eda6e8798aa..eca13591b77 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -342,10 +342,6 @@ func (s *server) run(ctx context.Context) (err error) { eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { - return s.capture.Run(egCtx) - }) - eg.Go(func() error { return s.upstreamPDHealthChecker(egCtx) }) @@ -371,6 +367,10 @@ func (s *server) run(ctx context.Context) (err error) { return nil }) + eg.Go(func() error { + return s.capture.Run(egCtx) + }) + return eg.Wait() }