Skip to content

Commit

Permalink
disttask: merge handle task loop (#50224)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
D3Hunter authored Jan 10, 2024
1 parent 9e8089b commit 91661e1
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 202 deletions.
14 changes: 0 additions & 14 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type TaskExecutor interface {
Init(context.Context) error
Run(context.Context, *proto.Task) error
Rollback(context.Context, *proto.Task) error
Pause(context.Context, *proto.Task) error
Close()
IsRetryableError(err error) bool
}
Expand Down
176 changes: 68 additions & 108 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ func (m *Manager) recoverMeta() (err error) {
// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
m.wg.Run(m.fetchAndHandleRunnableTasksLoop)
m.wg.Run(m.fetchAndFastCancelTasksLoop)
m.wg.Run(m.handleTasksLoop)
m.wg.Run(m.recoverMetaLoop)
return nil
}
Expand All @@ -171,66 +170,62 @@ func (m *Manager) Stop() {
m.wg.Wait()
}

// fetchAndHandleRunnableTasks fetches the runnable tasks from the task table and handles them.
func (m *Manager) fetchAndHandleRunnableTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndHandleRunnableTasksLoop", m.fetchAndHandleRunnableTasksLoop, false)
// handleTasksLoop handle tasks of interested states, including:
// - pending/running: start the task executor.
// - reverting: cancel the task executor, and mark running subtasks as Canceled.
// - pausing: cancel the task executor, mark all pending/running subtasks of current
// node as paused.
//
// Pausing is handled on every executor to make sure all subtasks are
// NOT running by executor before mark the task as paused.
func (m *Manager) handleTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "handleTasksLoop", m.handleTasksLoop, false)
ticker := time.NewTicker(checkTime)
for {
select {
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop done")
logutil.Logger(m.logCtx).Info("handle tasks loop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
}
m.onRunnableTasks(tasks)
}

m.handleTasks()
}
}

// fetchAndFastCancelTasks fetches the reverting/pausing tasks from the task table and fast cancels them.
func (m *Manager) fetchAndFastCancelTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndFastCancelTasksLoop", m.fetchAndFastCancelTasksLoop, false)
func (m *Manager) handleTasks() {
tasks, err := m.taskTable.GetTasksInStates(m.ctx, proto.TaskStateRunning,
proto.TaskStateReverting, proto.TaskStatePausing)
if err != nil {
m.logErr(err)
return
}

ticker := time.NewTicker(checkTime)
for {
select {
case <-m.ctx.Done():
m.cancelAllRunningTasks()
logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasksLoop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetTasksInStates(m.ctx, proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
executableTasks := make([]*proto.Task, 0, len(tasks))
for _, task := range tasks {
switch task.State {
case proto.TaskStateRunning, proto.TaskStateReverting:
if task.State == proto.TaskStateReverting {
m.cancelRunningSubtaskOf(task)
}
m.onCanceledTasks(m.ctx, tasks)

// cancel pending/running subtasks, and mark them as paused.
pausingTasks, err := m.taskTable.GetTasksInStates(m.ctx, proto.TaskStatePausing)
if err != nil {
m.logErr(err)
continue
// TaskStateReverting require executor to run rollback logic.
if !m.isExecutorStarted(task.ID) {
executableTasks = append(executableTasks, task)
}
if err := m.onPausingTasks(pausingTasks); err != nil {
case proto.TaskStatePausing:
if err := m.handlePausingTask(task); err != nil {
m.logErr(err)
continue
}
}
}
}

// onRunnableTasks handles runnable tasks.
func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
if len(tasks) == 0 {
return
if len(executableTasks) > 0 {
m.handleExecutableTasks(executableTasks)
}
tasks = m.filterAlreadyHandlingTasks(tasks)
}

// handleExecutableTasks handles executable tasks.
func (m *Manager) handleExecutableTasks(tasks []*proto.Task) {
for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step, unfinishedSubtaskStates...)
if err != nil {
Expand Down Expand Up @@ -259,7 +254,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
t := task
err = m.executorPool.Run(func() {
defer m.slotManager.free(t.ID)
m.onRunnableTask(t)
m.handleExecutableTask(t)
m.removeHandlingTask(t.ID)
})
// pool closed.
Expand All @@ -272,41 +267,30 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
}
}

// onCanceledTasks cancels the running subtasks.
func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
if len(tasks) == 0 {
return
}
// cancelRunningSubtaskOf cancels the running subtask of the task, the subtask
// will switch to `canceled` state.
func (m *Manager) cancelRunningSubtaskOf(task *proto.Task) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
// subtask needs to change its state to canceled.
cancel(ErrCancelSubtask)
}
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
// subtask needs to change its state to `canceled`.
cancel(ErrCancelSubtask)
}
}

// onPausingTasks pauses/cancels the pending/running subtasks.
func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
if len(tasks) == 0 {
return nil
}
func (m *Manager) handlePausingTask(task *proto.Task) error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onPausingTasks", zap.Any("task_id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
// Pause all running subtasks, don't mark subtasks as canceled.
// Should not change the subtask's state.
cancel(nil)
}
if err := m.taskTable.PauseSubtasks(m.ctx, m.id, task.ID); err != nil {
return err
}
logutil.Logger(m.logCtx).Info("handle pausing task", zap.Int64("task-id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
// cancel the task executor
cancel(nil)
}
return nil
// we pause subtasks belongs to this exec node even when there's no executor running.
// as balancer might move subtasks to this node when the executor hasn't started.
return m.taskTable.PauseSubtasks(m.ctx, m.id, task.ID)
}

// recoverMetaLoop recovers dist_framework_meta for the tidb node running the taskExecutor manager.
Expand All @@ -331,48 +315,20 @@ func (m *Manager) recoverMetaLoop() {
}
}

// cancelAllRunningTasks cancels all running tasks.
func (m *Manager) cancelAllRunningTasks() {
m.mu.RLock()
defer m.mu.RUnlock()
for id, cancel := range m.mu.handlingTasks {
logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id))
if cancel != nil {
// tidb shutdown, don't mark subtask as canceled.
// Should not change the subtask's state.
cancel(nil)
}
}
}

// cancelTaskExecutors cancels the task executors.
// unlike cancelRunningSubtaskOf, this function doesn't change subtask state.
func (m *Manager) cancelTaskExecutors(tasks []*proto.Task) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("cancelTasks", zap.Any("task_id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
// Pause all running subtasks, don't mark subtasks as canceled.
// Should not change the subtask's state.
// only cancel the executor, subtask state is not changed.
cancel(nil)
}
}
}

// filterAlreadyHandlingTasks filters the tasks that are already handled.
func (m *Manager) filterAlreadyHandlingTasks(tasks []*proto.Task) []*proto.Task {
m.mu.RLock()
defer m.mu.RUnlock()

var i int
for _, task := range tasks {
if _, ok := m.mu.handlingTasks[task.ID]; !ok {
tasks[i] = task
i++
}
}
return tasks[:i]
}

// TestContext only used in tests.
type TestContext struct {
TestSyncSubtaskRun chan struct{}
Expand All @@ -381,9 +337,9 @@ type TestContext struct {

var testContexts sync.Map

// onRunnableTask handles a runnable task.
func (m *Manager) onRunnableTask(task *proto.Task) {
logutil.Logger(m.logCtx).Info("onRunnableTask", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type))
// handleExecutableTask handles a runnable task.
func (m *Manager) handleExecutableTask(task *proto.Task) {
logutil.Logger(m.logCtx).Info("handleExecutableTask", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type))
// runCtx only used in executor.Run, cancel in m.fetchAndFastCancelTasks.
factory := GetTaskExecutorFactory(task.Type)
if factory == nil {
Expand All @@ -405,7 +361,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
for {
select {
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("onRunnableTask exit for cancel", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type))
logutil.Logger(m.logCtx).Info("handleExecutableTask exit for cancel", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type))
return
case <-time.After(checkTime):
}
Expand All @@ -426,7 +382,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
return
}
if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting {
logutil.Logger(m.logCtx).Info("onRunnableTask exit",
logutil.Logger(m.logCtx).Info("handleExecutableTask exit",
zap.Int64("task-id", task.ID), zap.Int64("step", int64(task.Step)), zap.Stringer("state", task.State))
return
}
Expand All @@ -444,9 +400,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
}
// use taskCtx for canceling.
err = executor.Run(taskCtx, task)
case proto.TaskStatePausing:
// use m.ctx since this process should not be canceled.
err = executor.Pause(m.ctx, task)
case proto.TaskStateReverting:
// use m.ctx since this process should not be canceled.
err = executor.Rollback(m.ctx, task)
Expand Down Expand Up @@ -478,6 +431,13 @@ func (m *Manager) removeHandlingTask(id int64) {
delete(m.mu.handlingTasks, id)
}

func (m *Manager) isExecutorStarted(taskID int64) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.mu.handlingTasks[taskID]
return ok
}

func (m *Manager) logErr(err error) {
logutil.Logger(m.logCtx).Error("task manager met error", zap.Error(err), zap.Stack("stack"))
}
Expand Down
Loading

0 comments on commit 91661e1

Please sign in to comment.