diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 8dd344676bd..60627768450 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -41,6 +41,7 @@ import ( var ( regionWorkerPool workerpool.WorkerPool workerPoolOnce sync.Once + workerPoolLock sync.Mutex // The magic number here is keep the same with some magic numbers in some // other components in TiCDC, including worker pool task chan size, mounter // chan size etc. @@ -402,6 +403,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv func (w *regionWorker) initPoolHandles() { handles := make([]workerpool.EventHandle, 0, w.concurrency) + workerPoolLock.Lock() + defer workerPoolLock.Unlock() for i := 0; i < w.concurrency; i++ { poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*regionStatefulEvent) @@ -858,6 +861,8 @@ func getWorkerPoolSize() (size int) { func InitWorkerPool() { workerPoolOnce.Do(func() { size := getWorkerPoolSize() + workerPoolLock.Lock() + defer workerPoolLock.Unlock() regionWorkerPool = workerpool.NewDefaultWorkerPool(size) }) }