Skip to content

Commit

Permalink
disttask,ddl: use subtask concurrency (#52286)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
lance6716 authored Apr 2, 2024
1 parent c1344e1 commit 76abd8b
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 61 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if err != nil {
return err
}
local.WorkerConcurrency = subtask.Concurrency * 2
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, m.index, m.ptbl.Meta())
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
)
Expand Down Expand Up @@ -89,7 +88,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}

prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID)))
partSize, err := getMergeSortPartSize(m.avgRowSize, int(variable.GetDDLReorgWorkerCounter()), m.idxNum)
partSize, err := getMergeSortPartSize(m.avgRowSize, subtask.Concurrency, m.idxNum)
if err != nil {
return err
}
Expand All @@ -102,7 +101,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
prefix,
external.DefaultBlockSize,
onClose,
int(variable.GetDDLReorgWorkerCounter()), true)
subtask.Concurrency,
true,
)
}

func (*mergeSortExecutor) Cleanup(ctx context.Context) error {
Expand Down
14 changes: 8 additions & 6 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (ctx *OperatorCtx) OperatorErr() error {
return *err
}

func getWriterMemSize(avgRowSize int, idxNum int) (uint64, error) {
func getWriterMemSize(avgRowSize, concurrency, idxNum int) (uint64, error) {
failpoint.Inject("mockWriterMemSize", func() {
failpoint.Return(1*size.GB, nil)
})
_, writerCnt := expectedIngestWorkerCnt(avgRowSize)
_, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
memTotal, err := memory.MemTotal()
if err != nil {
return 0, err
Expand All @@ -132,7 +132,7 @@ func getWriterMemSize(avgRowSize int, idxNum int) (uint64, error) {
}

func getMergeSortPartSize(avgRowSize int, concurrency int, idxNum int) (uint64, error) {
writerMemSize, err := getWriterMemSize(avgRowSize, idxNum)
writerMemSize, err := getWriterMemSize(avgRowSize, concurrency, idxNum)
if err != nil {
return 0, nil
}
Expand All @@ -156,6 +156,7 @@ func NewAddIndexIngestPipeline(
metricCounter prometheus.Counter,
reorgMeta *model.DDLReorgMeta,
avgRowSize int,
concurrency int,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -172,7 +173,7 @@ func NewAddIndexIngestPipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
Expand Down Expand Up @@ -209,6 +210,7 @@ func NewWriteIndexToExternalStoragePipeline(
onClose external.OnCloseFunc,
reorgMeta *model.DDLReorgMeta,
avgRowSize int,
concurrency int,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -225,7 +227,7 @@ func NewWriteIndexToExternalStoragePipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt, writerCnt := expectedIngestWorkerCnt(avgRowSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
if err != nil {
Expand All @@ -236,7 +238,7 @@ func NewWriteIndexToExternalStoragePipeline(
return nil, err
}

memSize, err := getWriterMemSize(avgRowSize, len(indexes))
memSize, err := getWriterMemSize(avgRowSize, concurrency, len(indexes))
if err != nil {
return nil, err
}
Expand Down
50 changes: 34 additions & 16 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

startKey, endKey, tbl, err := r.getTableStartEndKey(sm)
if err != nil {
return err
}

sessCtx, err := newSessCtx(
r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName)
if err != nil {
Expand All @@ -123,9 +118,9 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta

var pipe *operator.AsyncPipeline
if len(r.cloudStorageURI) > 0 {
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, r.curRowCount)
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, sm, subtask.Concurrency)
} else {
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, r.curRowCount)
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, sm, subtask.Concurrency)
}
if err != nil {
return err
Expand Down Expand Up @@ -226,10 +221,13 @@ func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) (
func (r *readIndexExecutor) buildLocalStorePipeline(
opCtx *OperatorCtx,
sessCtx sessionctx.Context,
tbl table.PhysicalTable,
start, end kv.Key,
totalRowCount *atomic.Int64,
sm *BackfillSubTaskMeta,
concurrency int,
) (*operator.AsyncPipeline, error) {
start, end, tbl, err := r.getTableStartEndKey(sm)
if err != nil {
return nil, err
}
d := r.d
engines := make([]ingest.Engine, 0, len(r.indexes))
for _, index := range r.indexes {
Expand All @@ -244,18 +242,37 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
return NewAddIndexIngestPipeline(
opCtx, d.store, d.sessPool, r.bc, engines, sessCtx, r.job.ID,
tbl, r.indexes, start, end, totalRowCount, counter, r.job.ReorgMeta, r.avgRowSize)
opCtx,
d.store,
d.sessPool,
r.bc,
engines,
sessCtx,
r.job.ID,
tbl,
r.indexes,
start,
end,
r.curRowCount,
counter,
r.job.ReorgMeta,
r.avgRowSize,
concurrency,
)
}

func (r *readIndexExecutor) buildExternalStorePipeline(
opCtx *OperatorCtx,
subtaskID int64,
sessCtx sessionctx.Context,
tbl table.PhysicalTable,
start, end kv.Key,
totalRowCount *atomic.Int64,
sm *BackfillSubTaskMeta,
concurrency int,
) (*operator.AsyncPipeline, error) {
start, end, tbl, err := r.getTableStartEndKey(sm)
if err != nil {
return nil, err
}

d := r.d
onClose := func(summary *external.WriterSummary) {
sum, _ := r.subtaskSummary.Load(subtaskID)
Expand Down Expand Up @@ -283,10 +300,11 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
r.indexes,
start,
end,
totalRowCount,
r.curRowCount,
counter,
onClose,
r.job.ReorgMeta,
r.avgRowSize,
concurrency,
)
}
25 changes: 10 additions & 15 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ctx = logutil.WithCategory(ctx, "ddl-ingest")
return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false)
return newIngestBackfillScheduler(ctx, info, sessPool, tbl)
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
Expand Down Expand Up @@ -312,7 +312,6 @@ type ingestBackfillScheduler struct {
reorgInfo *reorgInfo
sessPool *sess.Pool
tbl table.PhysicalTable
distribute bool
avgRowSize int

closed bool
Expand All @@ -334,7 +333,6 @@ func newIngestBackfillScheduler(
info *reorgInfo,
sessPool *sess.Pool,
tbl table.PhysicalTable,
distribute bool,
) (*ingestBackfillScheduler, error) {
sctx, err := sessPool.Get()
if err != nil {
Expand All @@ -351,7 +349,6 @@ func newIngestBackfillScheduler(
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
distribute: distribute,
}, nil
}

Expand Down Expand Up @@ -476,7 +473,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh
worker, err := newAddIndexIngestWorker(
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
reorgInfo.SchemaName, indexIDs, b.writerMaxID,
b.copReqSenderPool, sessCtx, b.checkpointMgr, b.distribute)
b.copReqSenderPool, sessCtx, b.checkpointMgr)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand Down Expand Up @@ -518,11 +515,11 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
}

func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
return expectedIngestWorkerCnt(b.avgRowSize)
return expectedIngestWorkerCnt(int(variable.GetDDLReorgWorkerCounter()), b.avgRowSize)
}

func expectedIngestWorkerCnt(avgRowSize int) (readerCnt, writerCnt int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
func expectedIngestWorkerCnt(concurrency, avgRowSize int) (readerCnt, writerCnt int) {
workerCnt := concurrency
if avgRowSize == 0 {
// Statistic data not exist, use default concurrency.
readerCnt = min(workerCnt/2, maxBackfillWorkerSize)
Expand Down Expand Up @@ -558,13 +555,11 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool
w.resultCh <- result
return
}
if !w.distribute {
err := w.d.isReorgRunnable(w.jobID, false)
if err != nil {
result.err = err
w.resultCh <- result
return
}
err := w.d.isReorgRunnable(w.jobID, false)
if err != nil {
result.err = err
w.resultCh <- result
return
}
count, nextKey, err := w.WriteLocal(&rs)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,9 +1710,8 @@ type addIndexIngestWorker struct {
copReqSenderPool *copReqSenderPool
checkpointMgr *ingest.CheckpointManager

resultCh chan *backfillResult
jobID int64
distribute bool
resultCh chan *backfillResult
jobID int64
}

func newAddIndexIngestWorker(
Expand All @@ -1728,7 +1727,6 @@ func newAddIndexIngestWorker(
copReqSenderPool *copReqSenderPool,
sessCtx sessionctx.Context,
checkpointMgr *ingest.CheckpointManager,
distribute bool,
) (*addIndexIngestWorker, error) {
indexes := make([]table.Index, 0, len(indexIDs))
writers := make([]ingest.Writer, 0, len(indexIDs))
Expand Down Expand Up @@ -1756,7 +1754,6 @@ func newAddIndexIngestWorker(
resultCh: resultCh,
jobID: jobID,
checkpointMgr: checkpointMgr,
distribute: distribute,
}, nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,14 @@ func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas
}

// CreateTaskWithSession adds a new task to task table with session.
func (mgr *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) {
func (mgr *TaskManager) CreateTaskWithSession(
ctx context.Context,
se sessionctx.Context,
key string,
tp proto.TaskType,
concurrency int,
meta []byte,
) (taskID int64, err error) {
cpuCount, err := mgr.getCPUCountOfManagedNode(ctx, se)
if err != nil {
return 0, err
Expand Down
11 changes: 8 additions & 3 deletions pkg/disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ var _ operator.Operator = (*encodeAndSortOperator)(nil)
var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil)
var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil)

func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor,
sharedVars *SharedVars, subtaskID int64) *encodeAndSortOperator {
func newEncodeAndSortOperator(
ctx context.Context,
executor *importStepExecutor,
sharedVars *SharedVars,
subtaskID int64,
concurrency int,
) *encodeAndSortOperator {
subCtx, cancel := context.WithCancel(ctx)
op := &encodeAndSortOperator{
ctx: subCtx,
Expand All @@ -83,7 +88,7 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor,
pool := workerpool.NewWorkerPool(
"encodeAndSortOperator",
util.ImportInto,
executor.taskMeta.Plan.ThreadCnt,
concurrency,
func() workerpool.Worker[*importStepMinimalTask, workerpool.None] {
return newChunkWorker(ctx, op, executor.dataKVMemSizePerCon, executor.perIndexKVMemSizePerCon)
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/importinto/encode_and_sort_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestEncodeAndSortOperator(t *testing.T) {
}

source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3)
op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 1)
op.SetSource(source)
require.NoError(t, op.Open())
require.Greater(t, len(op.String()), 0)
Expand All @@ -101,7 +101,7 @@ func TestEncodeAndSortOperator(t *testing.T) {
// cancel on error and log other errors
mockErr2 := errors.New("mock err 2")
source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2)
op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 2)
op.SetSource(source)
executor1 := mock.NewMockMiniTaskExecutor(ctrl)
executor2 := mock.NewMockMiniTaskExecutor(ctrl)
Expand Down
5 changes: 3 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
s.sharedVars.Store(subtaskMeta.ID, sharedVars)

source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID)
op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID, subtask.Concurrency)
op.SetSource(source)
pipeline := operator.NewAsyncPipeline(op)
if err = pipeline.Execute(); err != nil {
Expand Down Expand Up @@ -339,7 +339,7 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S
prefix,
getKVGroupBlockSize(sm.KVGroup),
onClose,
m.taskMeta.Plan.ThreadCnt,
subtask.Concurrency,
false)
logger.Info(
"merge sort finished",
Expand Down Expand Up @@ -403,6 +403,7 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr

_, engineUUID := backend.MakeUUID("", subtask.ID)
localBackend := e.tableImporter.Backend()
localBackend.WorkerConcurrency = subtask.Concurrency * 2
err = localBackend.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: e.taskMeta.Plan.CloudStorageURI,
Expand Down
Loading

0 comments on commit 76abd8b

Please sign in to comment.