Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49877
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
ywqzzy authored and ti-chi-bot committed Jan 2, 2024
1 parent bf303b5 commit 5bb4c5a
Show file tree
Hide file tree
Showing 13 changed files with 1,632 additions and 9 deletions.
11 changes: 11 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2074,8 +2074,13 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
if err != nil {
return err
}
<<<<<<< HEAD
task, err := taskManager.GetGlobalTaskByKeyWithHistory(w.ctx, taskKey)
if err != nil {
=======
task, err := taskManager.GetTaskByKeyWithHistory(w.ctx, taskKey)
if err != nil && err != storage.ErrTaskNotFound {
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
return err
}
if task != nil {
Expand Down Expand Up @@ -2189,9 +2194,15 @@ func (w *worker) updateJobRowCount(taskKey string, jobID int64) {
logutil.BgLogger().Warn("cannot get task manager", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
return
}
<<<<<<< HEAD
gTask, err := taskMgr.GetGlobalTaskByKey(w.ctx, taskKey)
if err != nil || gTask == nil {
logutil.BgLogger().Warn("cannot get global task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
=======
task, err := taskMgr.GetTaskByKey(w.ctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("cannot get task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
return
}
rowCount, err := taskMgr.GetSubtaskRowCount(w.ctx, gTask.ID, proto.StepOne)
Expand Down
46 changes: 44 additions & 2 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,24 @@ func SubmitGlobalTask(ctx context.Context, taskKey string, taskType proto.TaskTy
if err != nil {
return nil, err
}
<<<<<<< HEAD
globalTask, err := globalTaskManager.GetGlobalTaskByKey(ctx, taskKey)
=======
task, err := taskManager.GetTaskByKey(ctx, taskKey)
if err != nil && err != storage.ErrTaskNotFound {
return nil, err
}
if task != nil {
return nil, storage.ErrTaskAlreadyExists
}

taskID, err := taskManager.CreateTask(ctx, taskKey, taskType, concurrency, taskMeta)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
if err != nil {
return nil, err
}

<<<<<<< HEAD
if globalTask == nil {
taskID, err := globalTaskManager.AddNewGlobalTask(ctx, taskKey, taskType, concurrency, taskMeta)
if err != nil {
Expand All @@ -59,6 +72,16 @@ func SubmitGlobalTask(ctx context.Context, taskKey string, taskType proto.TaskTy
metrics.UpdateMetricsForAddTask(globalTask)
}
return globalTask, nil
=======
task, err = taskManager.GetTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
metrics.UpdateMetricsForAddTask(task)

NotifyTaskChange()
return task, nil
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
}

// WaitGlobalTask waits for a global task done or paused.
Expand Down Expand Up @@ -97,10 +120,14 @@ func WaitTaskDoneByKey(ctx context.Context, taskKey string) error {
if err != nil {
return err
}
<<<<<<< HEAD
if task == nil {
return errors.Errorf("cannot find global task with key %s", taskKey)
}
_, err = waitTask(ctx, task.ID, func(t *proto.Task) bool {
=======
_, err = WaitTask(ctx, task.ID, func(t *proto.Task) bool {
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
return t.IsDone()
})
return err
Expand All @@ -120,17 +147,24 @@ func waitTask(ctx context.Context, id int64, matchFn func(*proto.Task) bool) (*p
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
<<<<<<< HEAD
found, err := globalTaskManager.GetTaskByIDWithHistory(ctx, id)
=======
task, err := taskManager.GetTaskByIDWithHistory(ctx, id)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
if err != nil {
logger.Error("cannot get global task during waiting", zap.Error(err))
continue
}
<<<<<<< HEAD
if found == nil {
return nil, errors.Errorf("cannot find global task with ID %d", id)
}
=======
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))

if matchFn(found) {
return found, nil
if matchFn(task) {
return task, nil
}
}
}
Expand All @@ -153,14 +187,22 @@ func CancelGlobalTask(ctx context.Context, taskKey string) error {
}
task, err := taskManager.GetGlobalTaskByKey(ctx, taskKey)
if err != nil {
if err == storage.ErrTaskNotFound {
logutil.BgLogger().Info("task not exist", zap.String("taskKey", taskKey))
return nil
}
return err
}
<<<<<<< HEAD
if task == nil {
logutil.BgLogger().Info("task not exist", zap.String("taskKey", taskKey))

return nil
}
return taskManager.CancelGlobalTask(ctx, task.ID)
=======
return taskManager.CancelTask(ctx, task.ID)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
}

// PauseTask pauses a task.
Expand Down
35 changes: 35 additions & 0 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,56 @@ func TestHandle(t *testing.T) {
err := handle.SubmitAndRunGlobalTask(ctx, "1", proto.TaskTypeExample, 2, []byte("byte"))
require.Error(t, err)

<<<<<<< HEAD
task, err := mgr.GetGlobalTaskByID(ctx, 1)
=======
// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, proto.EmptyMeta)
require.NoError(t, err)
waitedTask, err := handle.WaitTask(ctx, task.ID, func(task *proto.Task) bool {
return task.IsDone()
})
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, waitedTask.State)
require.ErrorContains(t, waitedTask.Error, "unknown task type")

task, err = mgr.GetTaskByID(ctx, 1)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
require.NoError(t, err)
require.Equal(t, int64(1), task.ID)
require.Equal(t, "1", task.Key)
require.Equal(t, proto.TaskTypeExample, task.Type)
<<<<<<< HEAD
// no dispatcher registered
require.Equal(t, proto.TaskStateFailed, task.State)
require.Equal(t, proto.StepInit, task.Step)
require.Equal(t, uint64(2), task.Concurrency)
require.Equal(t, []byte("byte"), task.Meta)
=======
// no scheduler registered.
require.Equal(t, proto.TaskStateFailed, task.State)
require.Equal(t, proto.StepInit, task.Step)
require.Equal(t, 2, task.Concurrency)
require.Equal(t, proto.EmptyMeta, task.Meta)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))

require.NoError(t, handle.CancelGlobalTask(ctx, "1"))

<<<<<<< HEAD
task, err = handle.SubmitGlobalTask(ctx, "2", proto.TaskTypeExample, 2, []byte("byte"))
require.NoError(t, err)
=======
task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, proto.EmptyMeta)
require.NoError(t, err)
require.Equal(t, int64(2), task.ID)
require.Equal(t, "2", task.Key)

// submit same task.
task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, proto.EmptyMeta)
require.Nil(t, task)
require.Error(t, storage.ErrTaskAlreadyExists, err)
// pause and resume task.
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
require.NoError(t, handle.PauseTask(ctx, "2"))
require.NoError(t, handle.ResumeTask(ctx, "2"))
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (t *Task) IsDone() bool {
t.State == TaskStateFailed
}

<<<<<<< HEAD
// Subtask represents the subtask of distribute framework.
// Each task is divided into multiple subtasks by dispatcher.
type Subtask struct {
Expand All @@ -165,6 +166,12 @@ type Subtask struct {
Meta []byte
Summary string
}
=======
var (
// EmptyMeta is the empty meta of task/subtask.
EmptyMeta = []byte("{}")
)
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))

// IsFinished checks if the subtask is finished.
func (t *Subtask) IsFinished() bool {
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
m.logErr(err)
return
}
if task == nil {
return
}
if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting {
logutil.Logger(m.logCtx).Info("onRunnableTask exit",
zap.Int64("task-id", task.ID), zap.Int64("step", int64(task.Step)), zap.Stringer("state", task.State))
Expand Down
5 changes: 5 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func runSummaryCollectLoop(
if !ok {
return nil, func() {}, nil
}
<<<<<<< HEAD
opt, ok := taskTypes[task.Type]
if !ok {
return nil, func() {}, errors.Errorf("scheduler option for type %s not found", task.Type)
Expand All @@ -484,6 +485,10 @@ func runSummaryCollectLoop(
}, nil
}
return nil, func() {}, nil
=======
s.Task = newTask
return nil
>>>>>>> 237b2c7d507 (disttask: fix panic in task executor/scheduler (#49877))
}

func (s *BaseScheduler) registerCancelFunc(cancel context.CancelCauseFunc) {
Expand Down
Loading

0 comments on commit 5bb4c5a

Please sign in to comment.