From 205b58257c9a0878bda0172c1bb3893ba5431d6b Mon Sep 17 00:00:00 2001 From: nhsmw Date: Mon, 25 Nov 2024 15:25:02 +0800 Subject: [PATCH] sink(ticdc): limit encoder-concurrency to avoid crash (#11775) close pingcap/tiflow#11773 --- pkg/sink/codec/encoder_group.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index ad828f030a5..160ffc4253c 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/util/cpu" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/pkg/config" @@ -69,6 +70,11 @@ func NewEncoderGroup( if concurrency <= 0 { concurrency = config.DefaultEncoderGroupConcurrency } + limitConcurrency := cpu.GetCPUCount() * 10 + if concurrency > limitConcurrency { + concurrency = limitConcurrency + log.Warn("limit concurrency to avoid crash", zap.Int("concurrency", concurrency), zap.Any("limitConcurrency", limitConcurrency)) + } inputCh := make([]chan *future, concurrency) for i := 0; i < concurrency; i++ { inputCh[i] = make(chan *future, defaultInputChanSize)