Skip to content

Commit

Permalink
disttask: add IsRetryableError interface for taskExecutor (#49097)
Browse files Browse the repository at this point in the history
ref #48795
  • Loading branch information
ywqzzy authored Dec 4, 2023
1 parent c0affea commit 28cb579
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 41 deletions.
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_clean_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newBackfillCleanUpS3() dispatcher.CleanUpRoutine {

// CleanUp implements the CleanUpRoutine.CleanUp interface.
func (*BackfillCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error {
var taskMeta BackfillGlobalMeta
var taskMeta BackfillTaskMeta
if err := json.Unmarshal(task.Meta, &taskMeta); err != nil {
return err
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (*BackfillCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error {
func redactCloudStorageURI(
ctx context.Context,
task *proto.Task,
origin *BackfillGlobalMeta,
origin *BackfillTaskMeta,
) {
origin.CloudStorageURI = ast.RedactURL(origin.CloudStorageURI)
metaBytes, err := json.Marshal(origin)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
zap.String("curr-step", StepStr(task.Step)),
zap.String("next-step", StepStr(nextStep)),
)
var backfillMeta BackfillGlobalMeta
var backfillMeta BackfillTaskMeta
if err := json.Unmarshal(task.Meta, &backfillMeta); err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
}
}

func updateMeta(task *proto.Task, taskMeta *BackfillGlobalMeta) error {
func updateMeta(task *proto.Task, taskMeta *BackfillTaskMeta) error {
bs, err := json.Marshal(taskMeta)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -211,7 +211,7 @@ func newLitBackfillDispatcher(ctx context.Context, d *ddl, taskMgr dispatcher.Ta

// Init implements BaseDispatcher interface.
func (dsp *LitBackfillDispatcher) Init() (err error) {
taskMeta := &BackfillGlobalMeta{}
taskMeta := &BackfillTaskMeta{}
if err = json.Unmarshal(dsp.BaseDispatcher.Task.Meta, taskMeta); err != nil {
return errors.Annotate(err, "unmarshal task meta failed")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func createAddIndexTask(t *testing.T,
defaultSQLMode, err := mysql.GetSQLMode(mysql.DefaultSQLMode)
require.NoError(t, err)

taskMeta := &ddl.BackfillGlobalMeta{
taskMeta := &ddl.BackfillTaskMeta{
Job: model.Job{
ID: time.Now().UnixNano(),
SchemaID: db.ID,
Expand Down
26 changes: 22 additions & 4 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// BackfillGlobalMeta is the global task meta for backfilling index.
type BackfillGlobalMeta struct {
// BackfillTaskMeta is the dist task meta for backfilling index.
type BackfillTaskMeta struct {
Job model.Job `json:"job"`
// EleIDs stands for the index/column IDs to backfill with distributed framework.
EleIDs []int64 `json:"ele_ids"`
Expand All @@ -59,7 +62,7 @@ type BackfillSubTaskMeta struct {
// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bgm := &BackfillGlobalMeta{}
bgm := &BackfillTaskMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
return nil, err
Expand Down Expand Up @@ -127,7 +130,7 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error {
}
d := s.d

bgm := &BackfillGlobalMeta{}
bgm := &BackfillTaskMeta{}
err = json.Unmarshal(s.task.Meta, bgm)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -166,6 +169,21 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func isRetryableError(err error) bool {
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry Unknown err.
return false
}

func (*backfillDistExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err) || isRetryableError(err)
}

func (s *backfillDistExecutor) Close() {
if s.backendCtx != nil {
ingest.LitBackCtxMgr.Unregister(s.jobID)
Expand Down
16 changes: 8 additions & 8 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,8 +2066,8 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
}

// For resuming add index task.
// Need to fetch global task by taskKey in tidb_global_task and tidb_global_task_history tables.
// When pausing the related ddl job, it is possible that the global task with taskKey is succeed and in tidb_global_task_history.
// Need to fetch task by taskKey in tidb_global_task and tidb_global_task_history tables.
// When pausing the related ddl job, it is possible that the task with taskKey is succeed and in tidb_global_task_history.
// As a result, when resuming the related ddl job,
// it is necessary to check task exits in tidb_global_task and tidb_global_task_history tables.
taskManager, err := storage.GetTaskManager()
Expand All @@ -2083,7 +2083,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
// When task in succeed state, we can skip the dist task execution/scheduing process.
if task.State == proto.TaskStateSucceed {
logutil.BgLogger().Info(
"global task succeed, start to resume the ddl job",
"task succeed, start to resume the ddl job",
zap.String("category", "ddl"),
zap.String("task-key", taskKey))
return nil
Expand Down Expand Up @@ -2115,7 +2115,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
}

job := reorgInfo.Job
taskMeta := &BackfillGlobalMeta{
taskMeta := &BackfillTaskMeta{
Job: *reorgInfo.Job.Clone(),
EleIDs: elemIDs,
EleTypeKey: reorgInfo.currElement.TypeKey,
Expand Down Expand Up @@ -2157,7 +2157,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
if err = handle.PauseTask(w.ctx, taskKey); err != nil {
logutil.BgLogger().Error("pause global task error", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
logutil.BgLogger().Error("pause task error", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
continue
}
failpoint.Inject("syncDDLTaskPause", func() {
Expand All @@ -2169,8 +2169,8 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
return errors.Trace(err)
}
if err = handle.CancelTask(w.ctx, taskKey); err != nil {
logutil.BgLogger().Error("cancel global task error", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
// continue to cancel global task.
logutil.BgLogger().Error("cancel task error", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
// continue to cancel task.
continue
}
}
Expand All @@ -2191,7 +2191,7 @@ func (w *worker) updateJobRowCount(taskKey string, jobID int64) {
}
task, err := taskMgr.GetTaskByKey(w.ctx, taskKey)
if err != nil || task == nil {
logutil.BgLogger().Warn("cannot get global task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
logutil.BgLogger().Warn("cannot get task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
return
}
rowCount, err := taskMgr.GetSubtaskRowCount(w.ctx, task.ID, proto.StepOne)
Expand Down
28 changes: 28 additions & 0 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ go_library(
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/domain/infosync",
"//pkg/metrics",
"//pkg/parser/terror",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/dbterror",
"//pkg/util/gctuner",
"//pkg/util/logutil",
"//pkg/util/memory",
Expand Down
5 changes: 5 additions & 0 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type TaskExecutor interface {
Rollback(context.Context, *proto.Task) error
Pause(context.Context, *proto.Task) error
Close()
IsRetryableError(err error) bool
}

// Extension extends the TaskExecutor.
Expand All @@ -67,6 +68,10 @@ type Extension interface {
// GetSubtaskExecutor returns the subtask executor for the subtask.
// Note: summary is the summary manager of all subtask of the same type now.
GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error)
// IsRetryableError returns whether the error is transient.
// When error is transient, the framework won't mark subtasks as failed,
// then the TaskExecutor can load the subtask again and redo it.
IsRetryableError(err error) bool
}

// EmptySubtaskExecutor is an empty Executor.
Expand Down
11 changes: 5 additions & 6 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand Down Expand Up @@ -329,7 +328,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
factory := GetTaskExecutorFactory(task.Type)
if factory == nil {
err := errors.Errorf("task type %s not found", task.Type)
m.logErrAndPersist(err, task.ID)
m.logErrAndPersist(err, task.ID, nil)
return
}
executor := factory(m.ctx, m.id, task, m.taskTable)
Expand All @@ -339,7 +338,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
// executor should init before run()/pause()/rollback().
err := executor.Init(taskCtx)
if err != nil {
m.logErrAndPersist(err, task.ID)
m.logErrAndPersist(err, task.ID, executor)
return
}
defer executor.Close()
Expand Down Expand Up @@ -427,10 +426,10 @@ func (m *Manager) logErr(err error) {
logutil.Logger(m.logCtx).Error("task manager met error", zap.Error(err), zap.Stack("stack"))
}

func (m *Manager) logErrAndPersist(err error, taskID int64) {
func (m *Manager) logErrAndPersist(err error, taskID int64, taskExecutor TaskExecutor) {
m.logErr(err)
// TODO: use interface if each business to retry
if common.IsRetryableError(err) || isRetryableError(err) {
if taskExecutor.IsRetryableError(err) {
logutil.Logger(m.logCtx).Error("met retryable err", zap.Error(err), zap.Stack("stack"))
return
}
err1 := m.taskTable.UpdateErrorToSubtask(m.ctx, m.id, taskID, err)
Expand Down
16 changes: 1 addition & 15 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gctuner"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -601,18 +599,6 @@ func (s *BaseTaskExecutor) finishSubtaskAndUpdateState(ctx context.Context, subt
metrics.IncDistTaskSubTaskCnt(subtask)
}

// TODO: abstract interface for each business to implement it.
func isRetryableError(err error) bool {
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry Unknown err
return false
}

// markSubTaskCanceledOrFailed check the error type and decide the subtasks' state.
// 1. Only cancel subtasks when meet ErrCancelSubtask.
// 2. Only fail subtasks when meet non retryable error.
Expand All @@ -623,7 +609,7 @@ func (s *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subt
if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask {
logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err))
s.updateSubtaskStateAndError(s.ctx, subtask, proto.TaskStateCanceled, nil)
} else if common.IsRetryableError(err) || isRetryableError(err) {
} else if s.IsRetryableError(err) {
logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err))
} else if common.IsContextCanceledError(err) {
logutil.Logger(s.logCtx).Info("met context canceled for gracefully shutdown", zap.Error(err))
Expand Down
2 changes: 2 additions & 0 deletions pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestTaskExecutorRun(t *testing.T) {
mockSubtaskTable := mock.NewMockTaskTable(ctrl)
mockSubtaskExecutor := mockexecute.NewMockSubtaskExecutor(ctrl)
mockExtension := mock.NewMockExtension(ctrl)
mockExtension.EXPECT().IsRetryableError(gomock.Any()).AnyTimes()

// we don't test cancelCheck here, but in case the test is slow and trigger it.
mockSubtaskTable.EXPECT().IsTaskExecutorCanceled(ctx, gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
Expand Down Expand Up @@ -368,6 +369,7 @@ func TestTaskExecutor(t *testing.T) {
mockSubtaskTable.EXPECT().IsTaskExecutorCanceled(gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", taskID, proto.TaskStateFailed, gomock.Any()).Return(nil)
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()

taskExecutor := NewBaseTaskExecutor(ctx, "id", 1, mockSubtaskTable)
taskExecutor.Extension = mockExtension
Expand Down
2 changes: 2 additions & 0 deletions pkg/disttask/framework/testutil/disttest_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, dispatcherHandle di
}
mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes()
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, dispatcherHandle)
}

Expand Down Expand Up @@ -108,6 +109,7 @@ func RegisterRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, mockDispatc
mockExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockExtension.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes()
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()

registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, mockDispatcher)
testContext.RollbackCnt.Store(0)
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/testutil/executor_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func GetMockTaskExecutorExtension(ctrl *gomock.Controller, mockSubtaskExecutor *
mockExtension.EXPECT().
GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).
Return(mockSubtaskExecutor, nil).AnyTimes()
mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes()
return mockExtension
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ func (*importExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func (*importExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err)
}

func (*importExecutor) GetSubtaskExecutor(_ context.Context, task *proto.Task, _ *execute.Summary) (execute.SubtaskExecutor, error) {
taskMeta := TaskMeta{}
if err := json.Unmarshal(task.Meta, &taskMeta); err != nil {
Expand Down

0 comments on commit 28cb579

Please sign in to comment.