Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: task executor slot manager #49136

Merged
merged 28 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6295f58
rename
okJiang Nov 30, 2023
eae2e5e
save work
okJiang Dec 1, 2023
45e4b72
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
26e6254
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 1, 2023
66c50ed
rename GetSubtasksByStepAndStates
okJiang Dec 1, 2023
bfc0084
implement priority quere for task and slotManager
okJiang Dec 4, 2023
6df88d1
add ut
okJiang Dec 4, 2023
908b65c
fix ut
okJiang Dec 4, 2023
59db269
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 4, 2023
ff93eab
fix bazel
okJiang Dec 4, 2023
b69f291
add comment, fix bazel
okJiang Dec 5, 2023
f54e5d7
fix comment
okJiang Dec 5, 2023
8809905
add ut
okJiang Dec 5, 2023
b57efc6
fix comment
okJiang Dec 5, 2023
133f34b
add comment
okJiang Dec 5, 2023
e6006d1
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 13, 2023
0a72a54
delete priority queue
okJiang Dec 13, 2023
4a0ef07
remove useless convert
okJiang Dec 13, 2023
3c1beb9
fix bazel
okJiang Dec 13, 2023
8cc2e33
fix ut
okJiang Dec 13, 2023
c640046
Refactor slot management in task executor
okJiang Dec 14, 2023
c442754
Update pkg/disttask/framework/taskexecutor/slot.go
okJiang Dec 18, 2023
3781105
fix comment
okJiang Dec 18, 2023
6392faf
fix comment
okJiang Dec 19, 2023
7d9f45e
Merge branch 'master' of https://github.com/pingcap/tidb into disttas…
okJiang Dec 19, 2023
2003d2a
fix comment: refactor ut
okJiang Dec 20, 2023
a34651f
fix comment
okJiang Dec 20, 2023
06422f4
fix comment
okJiang Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ tools/bin/vfsgendev:
tools/bin/gotestsum:
GOBIN=$(shell pwd)/tools/bin $(GO) install gotest.tools/[email protected]

tools/bin/mockgen:
# [email protected] is imcompatible with v0.3.0, so install it always.
mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/[email protected]

# Usage:
Expand Down Expand Up @@ -375,17 +376,17 @@ br_compatibility_test_prepare:
br_compatibility_test:
@cd br && tests/run_compatible.sh run

mock_s3iface: tools/bin/mockgen
mock_s3iface: mockgen
tools/bin/mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning: tools/bin/mockgen
mock_lightning: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go
Expand Down
12 changes: 6 additions & 6 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,16 @@ func (stm *TaskManager) GetTopUnfinishedTasks(ctx context.Context) (task []*prot
return task, nil
}

// GetTasksInStates gets the tasks in the states.
// GetTasksInStates gets the tasks in the states(order by priority asc, create_time acs, id asc).
func (stm *TaskManager) GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error) {
if len(states) == 0 {
return task, nil
}

rs, err := stm.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)", states...)
rs, err := stm.executeSQLWithNewSession(ctx,
"select "+taskColumns+" from mysql.tidb_global_task "+
"where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)"+
" order by priority asc, create_time asc, id asc", states...)
if err != nil {
return task, err
}
Expand Down Expand Up @@ -438,8 +441,8 @@ func row2SubTask(r chunk.Row) *proto.Subtask {
return subtask
}

// GetSubtasksInStates gets all subtasks by given states.
func (stm *TaskManager) GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
// GetSubtasksByStepAndStates gets all subtasks by given states.
func (stm *TaskManager) GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error) {
args := []interface{}{tidbID, taskID, step}
args = append(args, states...)
rs, err := stm.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"interface.go",
"manager.go",
"register.go",
"slot_manager.go",
"task_executor.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor",
Expand Down Expand Up @@ -40,12 +41,13 @@ go_test(
srcs = [
"manager_test.go",
"register_test.go",
"slot_manager_test.go",
"task_executor_test.go",
"task_executor_testkit_test.go",
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 9,
shard_count = 11,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TaskTable interface {
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)

GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...interface{}) (*proto.Subtask, error)
StartManager(ctx context.Context, tidbID string, role string) error
StartSubtask(ctx context.Context, subtaskID int64) error
Expand Down
43 changes: 37 additions & 6 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package taskexecutor

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +41,10 @@ var (
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond

// for test
onRunnableTasksTick = make(chan struct{})
onRunnableTaskTick = make(chan struct{})
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -72,12 +77,13 @@ type Manager struct {
handlingTasks map[int64]context.CancelCauseFunc
}
// id, it's the same as server id now, i.e. host:port.
id string
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
id string
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
slotManager *slotManager
}

// BuildManager builds a Manager.
Expand All @@ -87,6 +93,10 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
taskTable: taskTable,
logCtx: logutil.WithFields(context.Background()),
newPool: b.newPool,
slotManager: &slotManager{
executorSlotInfos: make(map[int64]*slotInfo, 0),
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
available: runtime.NumCPU(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpu.GetCPUCount()

},
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc)
Expand Down Expand Up @@ -152,6 +162,7 @@ func (m *Manager) fetchAndHandleRunnableTasksLoop() {
m.logErr(err)
continue
}
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop", zap.Any("tasks", tasks))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop", zap.Any("tasks", tasks))

m.onRunnableTasks(tasks)
}
}
Expand Down Expand Up @@ -196,7 +207,9 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
return
}
tasks = m.filterAlreadyHandlingTasks(tasks)

for _, task := range tasks {
logutil.Logger(m.logCtx).Info("get new subtask", zap.Int64("task-id", task.ID))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logutil.Logger(m.logCtx).Info("get new subtask", zap.Int64("task-id", task.ID))
logutil.Logger(m.logCtx).Debug("get new subtask", zap.Int64("task-id", task.ID))

exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
Expand All @@ -210,18 +223,32 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
continue
}
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.checkSlotAvailabilityForTask(task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !m.slotManager.checkSlotAvailabilityForTask(task) {
if !m.slotManager.canReserve(task) {

failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
continue
}
m.addHandlingTask(task.ID)
t := task
err = m.executorPool.Run(func() {
m.slotManager.addTask(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m.slotManager.addTask(t)
m.slotManager.reserve(t)

defer m.slotManager.removeTask(t.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer m.slotManager.removeTask(t.ID)
defer m.slotManager.unReserve(t.ID)

m.onRunnableTask(t)
m.removeHandlingTask(t.ID)
})
// pool closed.
if err != nil {
m.slotManager.removeTask(t.ID)
m.removeHandlingTask(task.ID)
m.logErr(err)
return
}

failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
}
}

Expand Down Expand Up @@ -398,6 +425,10 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
if err != nil {
logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err))
}

failpoint.Inject("taskTick", func() {
<-onRunnableTaskTick
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed, just let executor ext block on Run using mock

}
}

Expand Down
134 changes: 133 additions & 1 deletion pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
Expand All @@ -47,9 +48,10 @@ func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) {
}

func TestManageTask(t *testing.T) {
b := NewManagerBuilder()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

b := NewManagerBuilder()
mockTaskTable := mock.NewMockTaskTable(ctrl)
m, err := b.BuildManager(context.Background(), "test", mockTaskTable)
require.NoError(t, err)
Expand Down Expand Up @@ -250,3 +252,133 @@ func TestManager(t *testing.T) {
time.Sleep(5 * time.Second)
m.Stop()
}

func TestSlotManagerInManager(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockTaskTable := mock.NewMockTaskTable(ctrl)
mockInternalExecutor := mock.NewMockTaskExecutor(ctrl)
mockPool := mock.NewMockPool(ctrl)
b := NewManagerBuilder()
b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) {
return mockPool, nil
})
RegisterTaskType("type",
func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) TaskExecutor {
return mockInternalExecutor
})
id := "test"

m, err := b.BuildManager(context.Background(), id, mockTaskTable)
require.NoError(t, err)
m.slotManager.available = 10

taskID1 := int64(1)
taskID2 := int64(2)

now := time.Now()

task1 := &proto.Task{
ID: taskID1,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 10,
Step: proto.StepOne,
Type: "type",
}
task2 := &proto.Task{
ID: taskID2,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 1,
Step: proto.StepOne,
Type: "type",
}

wg, runFn := getPoolRunFn()

// task1 is prior to task2
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task1, task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).Return(nil)
}

failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick", "return()")
defer func() {
failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick")
}()
go m.fetchAndHandleRunnableTasksLoop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can call onRunnableTasks directly to simplify mock, and we will not need failpoint anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not use failpoint, how can we check the status of slotManager?🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can call onRunnableTasks with 1 task and check in this case


{
// mock inside onRunnableTask
time.Sleep(2 * time.Second)
// task2 has been blocked by slot manager
require.Equal(t, 0, m.slotManager.available)

{
// mock in manager, task2 has been rejected by slot manager
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
onRunnableTasksTick <- struct{}{}
time.Sleep(time.Second)
}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)
require.Equal(t, 10, m.slotManager.available)
}

// task2 is after task1
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task2 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).Return(nil)
}
onRunnableTasksTick <- struct{}{}
time.Sleep(2 * time.Second)

require.Equal(t, 9, m.slotManager.available)
// task2 succeed
task2.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)

require.Equal(t, 10, m.slotManager.available)
wg.Wait()
}
Loading