From 76abd8b8361bec56c8806498a1c5e3021d531c1f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 2 Apr 2024 12:24:16 +0800 Subject: [PATCH] disttask,ddl: use subtask concurrency (#52286) ref pingcap/tidb#49008 --- pkg/ddl/backfilling_import_cloud.go | 1 + pkg/ddl/backfilling_merge_sort.go | 7 +-- pkg/ddl/backfilling_operators.go | 14 +++--- pkg/ddl/backfilling_read_index.go | 50 +++++++++++++------ pkg/ddl/backfilling_scheduler.go | 25 ++++------ pkg/ddl/index.go | 7 +-- pkg/disttask/framework/storage/task_table.go | 9 +++- .../importinto/encode_and_sort_operator.go | 11 ++-- .../encode_and_sort_operator_test.go | 4 +- pkg/disttask/importinto/task_executor.go | 5 +- pkg/executor/importer/import.go | 8 +-- pkg/executor/importer/import_test.go | 2 +- .../addindextest3/operator_test.go | 2 + 13 files changed, 84 insertions(+), 61 deletions(-) diff --git a/pkg/ddl/backfilling_import_cloud.go b/pkg/ddl/backfilling_import_cloud.go index 1faf2f2aef968..fd38a268a06cb 100644 --- a/pkg/ddl/backfilling_import_cloud.go +++ b/pkg/ddl/backfilling_import_cloud.go @@ -99,6 +99,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub if err != nil { return err } + local.WorkerConcurrency = subtask.Concurrency * 2 err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, m.index, m.ptbl.Meta()) diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 91042572ee3a7..91cb062bf17fb 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/lightning/backend/external" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" ) @@ -89,7 +88,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta } prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID))) - partSize, err := getMergeSortPartSize(m.avgRowSize, int(variable.GetDDLReorgWorkerCounter()), m.idxNum) + partSize, err := getMergeSortPartSize(m.avgRowSize, subtask.Concurrency, m.idxNum) if err != nil { return err } @@ -102,7 +101,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta prefix, external.DefaultBlockSize, onClose, - int(variable.GetDDLReorgWorkerCounter()), true) + subtask.Concurrency, + true, + ) } func (*mergeSortExecutor) Cleanup(ctx context.Context) error { diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 0a5eface0d4ec..76fcbe189b0e8 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -112,11 +112,11 @@ func (ctx *OperatorCtx) OperatorErr() error { return *err } -func getWriterMemSize(avgRowSize int, idxNum int) (uint64, error) { +func getWriterMemSize(avgRowSize, concurrency, idxNum int) (uint64, error) { failpoint.Inject("mockWriterMemSize", func() { failpoint.Return(1*size.GB, nil) }) - _, writerCnt := expectedIngestWorkerCnt(avgRowSize) + _, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) memTotal, err := memory.MemTotal() if err != nil { return 0, err @@ -132,7 +132,7 @@ func getWriterMemSize(avgRowSize int, idxNum int) (uint64, error) { } func getMergeSortPartSize(avgRowSize int, concurrency int, idxNum int) (uint64, error) { - writerMemSize, err := getWriterMemSize(avgRowSize, idxNum) + writerMemSize, err := getWriterMemSize(avgRowSize, concurrency, idxNum) if err != nil { return 0, nil } @@ -156,6 +156,7 @@ func NewAddIndexIngestPipeline( metricCounter prometheus.Counter, reorgMeta *model.DDLReorgMeta, avgRowSize int, + concurrency int, ) (*operator.AsyncPipeline, error) { indexes := make([]table.Index, 0, len(idxInfos)) for _, idxInfo := range idxInfos { @@ -172,7 +173,7 @@ func NewAddIndexIngestPipeline( for i := 0; i < poolSize; i++ { srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) } - readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize) + readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) @@ -209,6 +210,7 @@ func NewWriteIndexToExternalStoragePipeline( onClose external.OnCloseFunc, reorgMeta *model.DDLReorgMeta, avgRowSize int, + concurrency int, ) (*operator.AsyncPipeline, error) { indexes := make([]table.Index, 0, len(idxInfos)) for _, idxInfo := range idxInfos { @@ -225,7 +227,7 @@ func NewWriteIndexToExternalStoragePipeline( for i := 0; i < poolSize; i++ { srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) } - readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize) + readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) backend, err := storage.ParseBackend(extStoreURI, nil) if err != nil { @@ -236,7 +238,7 @@ func NewWriteIndexToExternalStoragePipeline( return nil, err } - memSize, err := getWriterMemSize(avgRowSize, len(indexes)) + memSize, err := getWriterMemSize(avgRowSize, concurrency, len(indexes)) if err != nil { return nil, err } diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 4356778d5fecf..0600009215081 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -106,11 +106,6 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - startKey, endKey, tbl, err := r.getTableStartEndKey(sm) - if err != nil { - return err - } - sessCtx, err := newSessCtx( r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName) if err != nil { @@ -123,9 +118,9 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta var pipe *operator.AsyncPipeline if len(r.cloudStorageURI) > 0 { - pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, r.curRowCount) + pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, sm, subtask.Concurrency) } else { - pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, r.curRowCount) + pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, sm, subtask.Concurrency) } if err != nil { return err @@ -226,10 +221,13 @@ func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) ( func (r *readIndexExecutor) buildLocalStorePipeline( opCtx *OperatorCtx, sessCtx sessionctx.Context, - tbl table.PhysicalTable, - start, end kv.Key, - totalRowCount *atomic.Int64, + sm *BackfillSubTaskMeta, + concurrency int, ) (*operator.AsyncPipeline, error) { + start, end, tbl, err := r.getTableStartEndKey(sm) + if err != nil { + return nil, err + } d := r.d engines := make([]ingest.Engine, 0, len(r.indexes)) for _, index := range r.indexes { @@ -244,18 +242,37 @@ func (r *readIndexExecutor) buildLocalStorePipeline( counter := metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) return NewAddIndexIngestPipeline( - opCtx, d.store, d.sessPool, r.bc, engines, sessCtx, r.job.ID, - tbl, r.indexes, start, end, totalRowCount, counter, r.job.ReorgMeta, r.avgRowSize) + opCtx, + d.store, + d.sessPool, + r.bc, + engines, + sessCtx, + r.job.ID, + tbl, + r.indexes, + start, + end, + r.curRowCount, + counter, + r.job.ReorgMeta, + r.avgRowSize, + concurrency, + ) } func (r *readIndexExecutor) buildExternalStorePipeline( opCtx *OperatorCtx, subtaskID int64, sessCtx sessionctx.Context, - tbl table.PhysicalTable, - start, end kv.Key, - totalRowCount *atomic.Int64, + sm *BackfillSubTaskMeta, + concurrency int, ) (*operator.AsyncPipeline, error) { + start, end, tbl, err := r.getTableStartEndKey(sm) + if err != nil { + return nil, err + } + d := r.d onClose := func(summary *external.WriterSummary) { sum, _ := r.subtaskSummary.Load(subtaskID) @@ -283,10 +300,11 @@ func (r *readIndexExecutor) buildExternalStorePipeline( r.indexes, start, end, - totalRowCount, + r.curRowCount, counter, onClose, r.job.ReorgMeta, r.avgRowSize, + concurrency, ) } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index d6400adce72c6..ec4b655468e5b 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -87,7 +87,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P jobCtx *JobContext) (backfillScheduler, error) { if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { ctx = logutil.WithCategory(ctx, "ddl-ingest") - return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false) + return newIngestBackfillScheduler(ctx, info, sessPool, tbl) } return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx) } @@ -312,7 +312,6 @@ type ingestBackfillScheduler struct { reorgInfo *reorgInfo sessPool *sess.Pool tbl table.PhysicalTable - distribute bool avgRowSize int closed bool @@ -334,7 +333,6 @@ func newIngestBackfillScheduler( info *reorgInfo, sessPool *sess.Pool, tbl table.PhysicalTable, - distribute bool, ) (*ingestBackfillScheduler, error) { sctx, err := sessPool.Get() if err != nil { @@ -351,7 +349,6 @@ func newIngestBackfillScheduler( taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultCh: make(chan *backfillResult, backfillTaskChanSize), poolErr: make(chan error), - distribute: distribute, }, nil } @@ -476,7 +473,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh worker, err := newAddIndexIngestWorker( b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID, reorgInfo.SchemaName, indexIDs, b.writerMaxID, - b.copReqSenderPool, sessCtx, b.checkpointMgr, b.distribute) + b.copReqSenderPool, sessCtx, b.checkpointMgr) if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { @@ -518,11 +515,11 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e } func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) { - return expectedIngestWorkerCnt(b.avgRowSize) + return expectedIngestWorkerCnt(int(variable.GetDDLReorgWorkerCounter()), b.avgRowSize) } -func expectedIngestWorkerCnt(avgRowSize int) (readerCnt, writerCnt int) { - workerCnt := int(variable.GetDDLReorgWorkerCounter()) +func expectedIngestWorkerCnt(concurrency, avgRowSize int) (readerCnt, writerCnt int) { + workerCnt := concurrency if avgRowSize == 0 { // Statistic data not exist, use default concurrency. readerCnt = min(workerCnt/2, maxBackfillWorkerSize) @@ -558,13 +555,11 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool w.resultCh <- result return } - if !w.distribute { - err := w.d.isReorgRunnable(w.jobID, false) - if err != nil { - result.err = err - w.resultCh <- result - return - } + err := w.d.isReorgRunnable(w.jobID, false) + if err != nil { + result.err = err + w.resultCh <- result + return } count, nextKey, err := w.WriteLocal(&rs) if err != nil { diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 895458afb8bfd..cf52adaa5799b 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1710,9 +1710,8 @@ type addIndexIngestWorker struct { copReqSenderPool *copReqSenderPool checkpointMgr *ingest.CheckpointManager - resultCh chan *backfillResult - jobID int64 - distribute bool + resultCh chan *backfillResult + jobID int64 } func newAddIndexIngestWorker( @@ -1728,7 +1727,6 @@ func newAddIndexIngestWorker( copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context, checkpointMgr *ingest.CheckpointManager, - distribute bool, ) (*addIndexIngestWorker, error) { indexes := make([]table.Index, 0, len(indexIDs)) writers := make([]ingest.Writer, 0, len(indexIDs)) @@ -1756,7 +1754,6 @@ func newAddIndexIngestWorker( resultCh: resultCh, jobID: jobID, checkpointMgr: checkpointMgr, - distribute: distribute, }, nil } diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index abe2144d3e3eb..9144ddf527450 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -202,7 +202,14 @@ func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas } // CreateTaskWithSession adds a new task to task table with session. -func (mgr *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) { +func (mgr *TaskManager) CreateTaskWithSession( + ctx context.Context, + se sessionctx.Context, + key string, + tp proto.TaskType, + concurrency int, + meta []byte, +) (taskID int64, err error) { cpuCount, err := mgr.getCPUCountOfManagedNode(ctx, se) if err != nil { return 0, err diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 6b6accd4595a8..38b743392db43 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -67,8 +67,13 @@ var _ operator.Operator = (*encodeAndSortOperator)(nil) var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil) var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil) -func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, - sharedVars *SharedVars, subtaskID int64) *encodeAndSortOperator { +func newEncodeAndSortOperator( + ctx context.Context, + executor *importStepExecutor, + sharedVars *SharedVars, + subtaskID int64, + concurrency int, +) *encodeAndSortOperator { subCtx, cancel := context.WithCancel(ctx) op := &encodeAndSortOperator{ ctx: subCtx, @@ -83,7 +88,7 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, pool := workerpool.NewWorkerPool( "encodeAndSortOperator", util.ImportInto, - executor.taskMeta.Plan.ThreadCnt, + concurrency, func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { return newChunkWorker(ctx, op, executor.dataKVMemSizePerCon, executor.perIndexKVMemSizePerCon) }, diff --git a/pkg/disttask/importinto/encode_and_sort_operator_test.go b/pkg/disttask/importinto/encode_and_sort_operator_test.go index c484457a08a65..f5d41d161e0b2 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator_test.go +++ b/pkg/disttask/importinto/encode_and_sort_operator_test.go @@ -81,7 +81,7 @@ func TestEncodeAndSortOperator(t *testing.T) { } source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3) + op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 1) op.SetSource(source) require.NoError(t, op.Open()) require.Greater(t, len(op.String()), 0) @@ -101,7 +101,7 @@ func TestEncodeAndSortOperator(t *testing.T) { // cancel on error and log other errors mockErr2 := errors.New("mock err 2") source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2) + op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 2) op.SetSource(source) executor1 := mock.NewMockMiniTaskExecutor(ctrl) executor2 := mock.NewMockMiniTaskExecutor(ctrl) diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 53ad1da6d4a20..a30dd3a0360a4 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -161,7 +161,7 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt s.sharedVars.Store(subtaskMeta.ID, sharedVars) source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID) + op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID, subtask.Concurrency) op.SetSource(source) pipeline := operator.NewAsyncPipeline(op) if err = pipeline.Execute(); err != nil { @@ -339,7 +339,7 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S prefix, getKVGroupBlockSize(sm.KVGroup), onClose, - m.taskMeta.Plan.ThreadCnt, + subtask.Concurrency, false) logger.Info( "merge sort finished", @@ -403,6 +403,7 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr _, engineUUID := backend.MakeUUID("", subtask.ID) localBackend := e.tableImporter.Backend() + localBackend.WorkerConcurrency = subtask.Concurrency * 2 err = localBackend.CloseEngine(ctx, &backend.EngineConfig{ External: &backend.ExternalEngineConfig{ StorageURI: e.taskMeta.Plan.CloudStorageURI, diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 2839175c8faf9..d4361f4bef8a3 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1303,16 +1303,10 @@ func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]ex } func (e *LoadDataController) getBackendWorkerConcurrency() int { - // when using global sort, write&ingest step buffers KV data in memory, // suppose cpu:mem ratio 1:2(true in most case), and we assign 1G per concurrency, // so we can use 2 * threadCnt as concurrency. write&ingest step is mostly // IO intensive, so CPU usage is below ThreadCnt in our tests. - // The real concurrency used is adjusted in external engine later. - // when using local sort, use the default value as lightning. - if e.IsGlobalSort() { - return e.ThreadCnt * 2 - } - return config.DefaultRangeConcurrency * 2 + return e.ThreadCnt * 2 } func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.BackendConfig { diff --git a/pkg/executor/importer/import_test.go b/pkg/executor/importer/import_test.go index 69b8807fc7435..9ebd90a276f73 100644 --- a/pkg/executor/importer/import_test.go +++ b/pkg/executor/importer/import_test.go @@ -319,7 +319,7 @@ func TestGetBackendWorkerConcurrency(t *testing.T) { ThreadCnt: 3, }, } - require.Equal(t, 32, c.getBackendWorkerConcurrency()) + require.Equal(t, 6, c.getBackendWorkerConcurrency()) c.Plan.CloudStorageURI = "xxx" require.Equal(t, 6, c.getBackendWorkerConcurrency()) c.Plan.ThreadCnt = 123 diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 3098fde04a250..067fdc672dd08 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -197,6 +197,7 @@ func TestBackfillOperatorPipeline(t *testing.T) { nil, ddl.NewDDLReorgMeta(tk.Session()), 0, + 2, ) require.NoError(t, err) err = pipeline.Execute() @@ -273,6 +274,7 @@ func TestBackfillOperatorPipelineException(t *testing.T) { nil, ddl.NewDDLReorgMeta(tk.Session()), 0, + 2, ) require.NoError(t, err) err = pipeline.Execute()