From 6c1515ce7d569f5db9e2b485727e190c5da0728f Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 30 Nov 2023 02:10:18 -0600 Subject: [PATCH] kvclient(ticdc): fix the workerpool data race (#10195) close pingcap/tiflow#10095 --- cdc/kv/region_worker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 8dd344676bd..60627768450 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -41,6 +41,7 @@ import ( var ( regionWorkerPool workerpool.WorkerPool workerPoolOnce sync.Once + workerPoolLock sync.Mutex // The magic number here is keep the same with some magic numbers in some // other components in TiCDC, including worker pool task chan size, mounter // chan size etc. @@ -402,6 +403,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv func (w *regionWorker) initPoolHandles() { handles := make([]workerpool.EventHandle, 0, w.concurrency) + workerPoolLock.Lock() + defer workerPoolLock.Unlock() for i := 0; i < w.concurrency; i++ { poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*regionStatefulEvent) @@ -858,6 +861,8 @@ func getWorkerPoolSize() (size int) { func InitWorkerPool() { workerPoolOnce.Do(func() { size := getWorkerPoolSize() + workerPoolLock.Lock() + defer workerPoolLock.Unlock() regionWorkerPool = workerpool.NewDefaultWorkerPool(size) }) }