diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index ad828f030a5..160ffc4253c 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/util/cpu" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/pkg/config" @@ -69,6 +70,11 @@ func NewEncoderGroup( if concurrency <= 0 { concurrency = config.DefaultEncoderGroupConcurrency } + limitConcurrency := cpu.GetCPUCount() * 10 + if concurrency > limitConcurrency { + concurrency = limitConcurrency + log.Warn("limit concurrency to avoid crash", zap.Int("concurrency", concurrency), zap.Any("limitConcurrency", limitConcurrency)) + } inputCh := make([]chan *future, concurrency) for i := 0; i < concurrency; i++ { inputCh[i] = make(chan *future, defaultInputChanSize)