Skip to content

Commit

Permalink
disttask: persist subtask error properly (#46674)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
tangenta authored Sep 5, 2023
1 parent d33cde7 commit abec0c6
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 20 deletions.
8 changes: 4 additions & 4 deletions disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type TaskTable interface {
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)

GetSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
StartSubtask(id int64) error
UpdateSubtaskStateAndError(id int64, state string, err error) error
FinishSubtask(id int64, meta []byte) error
StartSubtask(subtaskID int64) error
UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error
FinishSubtask(subtaskID int64, meta []byte) error
HasSubtasksInStates(instanceID string, taskID int64, step int64, states ...interface{}) (bool, error)
UpdateErrorToSubtask(tidbID string, err error) error
IsSchedulerCanceled(taskID int64, execID string) (bool, error)
UpdateErrorToSubtask(instanceID string, taskID int64, err error) error
IsSchedulerCanceled(taskID int64, instanceID string) (bool, error)
}

// Pool defines the interface of a pool.
Expand Down
6 changes: 3 additions & 3 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
if s.mu.handled {
return err
}
return s.taskTable.UpdateErrorToSubtask(s.id, err)
return s.taskTable.UpdateErrorToSubtask(s.id, task.ID, err)
}

func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error {
Expand Down Expand Up @@ -477,8 +477,8 @@ func (s *InternalSchedulerImpl) startSubtask(id int64) {
}
}

func (s *InternalSchedulerImpl) updateSubtaskStateAndError(id int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(id, state, subTaskErr)
func (s *InternalSchedulerImpl) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr)
if err != nil {
s.onError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func TestSchedulerRun(t *testing.T) {
// UpdateErrorToSubtask won't return such errors, but since the error is not handled,
// it's saved by UpdateErrorToSubtask.
// here we use this to check the returned error of s.run.
forwardErrFn := func(_ string, err error) error {
forwardErrFn := func(_ string, _ int64, err error) error {
return err
}
mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any()).DoAndReturn(forwardErrFn).AnyTimes()
mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(forwardErrFn).AnyTimes()
err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp})
require.EqualError(t, err, schedulerRegisterErr.Error())

Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestSubTaskTable(t *testing.T) {
// test UpdateErrorToSubtask do update start/update time
err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)
require.NoError(t, sm.UpdateErrorToSubtask("for_test", errors.New("fail")))
require.NoError(t, sm.UpdateErrorToSubtask("for_test", 3, errors.New("fail")))
subtask, err = sm.GetSubtaskInStates("for_test", 3, proto.StepInit, proto.TaskStateFailed)
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, subtask.State)
Expand Down
10 changes: 5 additions & 5 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ func (stm *TaskManager) GetSubtaskInStates(tidbID string, taskID int64, step int
}

// UpdateErrorToSubtask updates the error to subtask.
func (stm *TaskManager) UpdateErrorToSubtask(tidbID string, err error) error {
func (stm *TaskManager) UpdateErrorToSubtask(tidbID string, taskID int64, err error) error {
if err == nil {
return nil
}
_, err1 := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
set state = %?, error = %?, start_time = unix_timestamp(), state_update_time = unix_timestamp()
where exec_id = %? and state = %? limit 1;`,
proto.TaskStateFailed, serializeErr(err), tidbID, proto.TaskStatePending)
where exec_id = %? and task_key = %? and state = %? limit 1;`,
proto.TaskStateFailed, serializeErr(err), tidbID, taskID, proto.TaskStatePending)
return err1
}

Expand Down Expand Up @@ -469,11 +469,11 @@ func (stm *TaskManager) HasSubtasksInStates(tidbID string, taskID int64, step in
}

// StartSubtask updates the subtask state to running.
func (stm *TaskManager) StartSubtask(id int64) error {
func (stm *TaskManager) StartSubtask(subtaskID int64) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
set state = %?, start_time = unix_timestamp(), state_update_time = unix_timestamp()
where id = %?`,
proto.TaskStateRunning, id)
proto.TaskStateRunning, subtaskID)
return err
}

Expand Down

0 comments on commit abec0c6

Please sign in to comment.