Skip to content

Commit

Permalink
dxf: support modify concurrency/task-specific-param for later subtask…
Browse files Browse the repository at this point in the history
… at task executor part (#58144)

ref #57497
  • Loading branch information
D3Hunter authored Dec 27, 2024
1 parent 652cee2 commit 293077b
Show file tree
Hide file tree
Showing 32 changed files with 962 additions and 312 deletions.
14 changes: 6 additions & 8 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,17 @@ func hasUniqueIndex(job *model.Job) (bool, error) {

type backfillDistExecutor struct {
*taskexecutor.BaseTaskExecutor
d *ddl
task *proto.Task
taskTable taskexecutor.TaskTable
taskMeta *BackfillTaskMeta
jobID int64
d *ddl
task *proto.Task
taskMeta *BackfillTaskMeta
jobID int64
}

func newBackfillDistExecutor(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable, d *ddl) taskexecutor.TaskExecutor {
func newBackfillDistExecutor(ctx context.Context, task *proto.Task, param taskexecutor.Param, d *ddl) taskexecutor.TaskExecutor {
s := &backfillDistExecutor{
BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(ctx, id, task, taskTable),
BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(ctx, task, param),
d: d,
task: task,
taskTable: taskTable,
}
s.BaseTaskExecutor.Extension = s
return s
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

type cloudImportExecutor struct {
taskexecutor.EmptyStepExecutor
taskexecutor.BaseStepExecutor
job *model.Job
indexes []*model.IndexInfo
ptbl table.PhysicalTable
Expand Down Expand Up @@ -132,7 +132,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if err != nil {
return err
}
local.WorkerConcurrency = subtask.Concurrency * 2
local.WorkerConcurrency = int(m.GetResource().CPU.Capacity()) * 2
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err == nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

type mergeSortExecutor struct {
taskexecutor.EmptyStepExecutor
taskexecutor.BaseStepExecutor
jobID int64
idxNum int
ptbl table.PhysicalTable
Expand Down Expand Up @@ -86,7 +86,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta

prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID)))
res := m.GetResource()
memSizePerCon := res.Mem.Capacity() / int64(subtask.Concurrency)
memSizePerCon := res.Mem.Capacity() / res.CPU.Capacity()
partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/10000)

err = external.MergeOverlappingFiles(
Expand All @@ -97,7 +97,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
prefix,
external.DefaultBlockSize,
onClose,
subtask.Concurrency,
int(res.CPU.Capacity()),
true,
)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
Expand All @@ -40,7 +41,7 @@ import (
)

type readIndexExecutor struct {
execute.StepExecFrameworkInfo
taskexecutor.BaseStepExecutor
d *ddl
job *model.Job
indexes []*model.IndexInfo
Expand Down Expand Up @@ -110,8 +111,9 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
defer cancel()
r.curRowCount.Store(0)

concurrency := int(r.GetResource().CPU.Capacity())
if len(r.cloudStorageURI) > 0 {
pipe, err := r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
pipe, err := r.buildExternalStorePipeline(opCtx, subtask.ID, sm, concurrency)
if err != nil {
return err
}
Expand All @@ -121,7 +123,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return r.onFinished(ctx, subtask)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
pipe, err := r.buildLocalStorePipeline(opCtx, sm, concurrency)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
}

taskexecutor.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor {
return newBackfillDistExecutor(ctx, id, task, taskTable, d)
func(ctx context.Context, task *proto.Task, param taskexecutor.Param) taskexecutor.TaskExecutor {
return newBackfillDistExecutor(ctx, task, param, d)
},
)

Expand Down
179 changes: 134 additions & 45 deletions pkg/disttask/framework/integrationtests/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,80 @@ import (
"time"

"github.com/pingcap/tidb/pkg/disttask/framework/handle"
mockexecute "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

type collectedRuntimeInfo struct {
// not used right now, will support modify task specific params in later PR.
currentTask *proto.Task
subtaskInfos []subtaskRuntimeInfo
}

type subtaskRuntimeInfo struct {
Step proto.Step
Concurrency int
}

func TestModifyTaskConcurrency(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1, 16, true)
stepInfos := []testutil.StepInfo{
{Step: proto.StepOne, SubtaskCnt: 2},
{Step: proto.StepTwo, SubtaskCnt: 3},
}
schedulerExt := testutil.GetMockSchedulerExt(c.MockCtrl, testutil.SchedulerInfo{
AllErrorRetryable: true,
StepInfos: []testutil.StepInfo{
{Step: proto.StepOne, SubtaskCnt: 1},
{Step: proto.StepTwo, SubtaskCnt: 1},
},
StepInfos: stepInfos,
})
subtaskCh := make(chan struct{})
registerExampleTask(t, c.MockCtrl, schedulerExt, c.TestContext,
func(ctx context.Context, subtask *proto.Subtask) error {
select {
case <-subtaskCh:
case <-ctx.Done():
return ctx.Err()
}
runtimeInfo := collectedRuntimeInfo{}
runSubtaskFn := func(ctx context.Context, subtask *proto.Subtask) error {
select {
case <-subtaskCh:
runtimeInfo.subtaskInfos = append(runtimeInfo.subtaskInfos, subtaskRuntimeInfo{
Step: subtask.Step,
Concurrency: subtask.Concurrency,
})
case <-ctx.Done():
return ctx.Err()
}
return nil
}

executorExt := testutil.GetCommonTaskExecutorExt(c.MockCtrl, func(task *proto.Task) (execute.StepExecutor, error) {
runtimeInfo.currentTask = task
executor := mockexecute.NewMockStepExecutor(c.MockCtrl)
executor.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()
executor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(runSubtaskFn).AnyTimes()
executor.EXPECT().GetStep().Return(task.Step).AnyTimes()
executor.EXPECT().SetResource(gomock.Any()).AnyTimes()
executor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes()
executor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
executor.EXPECT().TaskMetaModified(gomock.Any()).DoAndReturn(func(newTask *proto.Task) error {
runtimeInfo.currentTask = newTask
return nil
},
)
}).AnyTimes()
return executor, nil
})
testutil.RegisterExampleTask(t, schedulerExt, executorExt, testutil.GetCommonCleanUpRoutine(c.MockCtrl))

resetRuntimeInfoFn := func() {
runtimeInfo = collectedRuntimeInfo{}
}

t.Run("modify pending task concurrency", func(t *testing.T) {
defer resetRuntimeInfoFn()
var once sync.Once
modifySyncCh := make(chan struct{})
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k1", proto.TaskTypeExample, 3, "", nil)
task, err := handle.SubmitTask(c.Ctx, "k1", proto.TaskTypeExample, 3, "", []byte("init"))
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
Expand All @@ -73,17 +113,22 @@ func TestModifyTaskConcurrency(t *testing.T) {
})
modifySyncCh <- struct{}{}
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
for i := 0; i < 5; i++ {
subtaskCh <- struct{}{}
}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
require.EqualValues(t, []subtaskRuntimeInfo{
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})

t.Run("modify running task concurrency at step two", func(t *testing.T) {
defer resetRuntimeInfoFn()
var once sync.Once
modifySyncCh := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) {
Expand All @@ -105,6 +150,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
require.Equal(t, 3, task.Concurrency)
// finish StepOne
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
// wait task move to 'modifying' state
modifySyncCh <- struct{}{}
// wait task move back to 'running' state
Expand All @@ -115,15 +161,58 @@ func TestModifyTaskConcurrency(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
// finish StepTwo
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
task2Base := testutil.WaitTaskDone(c.Ctx, t, task.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, task.ID, map[proto.Step]int{
proto.StepOne: 3,
proto.StepTwo: 7,
})
require.EqualValues(t, []subtaskRuntimeInfo{
{Step: proto.StepOne, Concurrency: 3},
{Step: proto.StepOne, Concurrency: 3},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})

t.Run("modify running task concurrency at second subtask of step two", func(t *testing.T) {
defer resetRuntimeInfoFn()
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/beforeGetTaskByIDInRun",
func(taskID int64) {
if len(runtimeInfo.subtaskInfos) == 3 {
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, taskID, &proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 7},
},
}))
// wait task move back to 'running' state
require.Eventually(t, func() bool {
gotTask, err2 := c.TaskMgr.GetTaskByID(c.Ctx, taskID)
require.NoError(t, err2)
return gotTask.State == proto.TaskStateRunning
}, 10*time.Second, 100*time.Millisecond)
}
},
)
task, err := handle.SubmitTask(c.Ctx, "k2-2", proto.TaskTypeExample, 3, "", nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
for i := 0; i < 5; i++ {
subtaskCh <- struct{}{}
}
task2Base := testutil.WaitTaskDone(c.Ctx, t, task.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
require.EqualValues(t, []subtaskRuntimeInfo{
{Step: proto.StepOne, Concurrency: 3},
{Step: proto.StepOne, Concurrency: 3},
{Step: proto.StepTwo, Concurrency: 3},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})

t.Run("modify paused task concurrency", func(t *testing.T) {
defer resetRuntimeInfoFn()
var once sync.Once
syncCh := make(chan struct{})
var theTask *proto.Task
Expand Down Expand Up @@ -154,17 +243,22 @@ func TestModifyTaskConcurrency(t *testing.T) {
require.NoError(t, err)
require.True(t, found)
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
for i := 0; i < 5; i++ {
subtaskCh <- struct{}{}
}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
require.EqualValues(t, []subtaskRuntimeInfo{
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})

t.Run("modify pending task concurrency, but other owner already done it", func(t *testing.T) {
defer resetRuntimeInfoFn()
var once sync.Once
modifySyncCh := make(chan struct{})
var theTask *proto.Task
Expand Down Expand Up @@ -203,22 +297,17 @@ func TestModifyTaskConcurrency(t *testing.T) {
)
modifySyncCh <- struct{}{}
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
for i := 0; i < 5; i++ {
subtaskCh <- struct{}{}
}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
require.EqualValues(t, []subtaskRuntimeInfo{
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepOne, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
{Step: proto.StepTwo, Concurrency: 7},
}, runtimeInfo.subtaskInfos)
})
}

func checkSubtaskConcurrency(t *testing.T, c *testutil.TestDXFContext, taskID int64, expectedStepCon map[proto.Step]int) {
for step, con := range expectedStepCon {
subtasks, err := c.TaskMgr.GetSubtasksWithHistory(c.Ctx, taskID, step)
require.NoError(t, err)
require.Len(t, subtasks, 1)
require.Equal(t, con, subtasks[0].Concurrency)
}
}
Loading

0 comments on commit 293077b

Please sign in to comment.