Skip to content

Commit

Permalink
disttask/ddl: refine manager error handling (#48095)
Browse files Browse the repository at this point in the history
ref #46258, close #48064
  • Loading branch information
ywqzzy authored Oct 31, 2023
1 parent de6ebc0 commit 2b18f64
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{
ErrKVReadIndexNotReady.ID(): {},
ErrKVIngestFailed.ID(): {},
ErrKVRaftProposalDropped.ID(): {},
// litBackendCtxMgr.Register may return the error.
ErrCreatePDClient.ID(): {},
// during checksum coprocessor will transform error into driver error in handleCopResponse using ToTiDBErr
// met ErrRegionUnavailable on free-tier import during checksum, others hasn't met yet
drivererr.ErrRegionUnavailable.ID(): {},
Expand Down
4 changes: 1 addition & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,9 +1088,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
// TODO(tangenta): get duplicate column and match index.
err = convertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta())
}
if !errorIsRetryable(err, job) ||
// TODO: Remove this check make it can be retry. Related test is TestModifyColumnReorgInfo.
job.ReorgMeta.IsDistReorg {
if !errorIsRetryable(err, job) {
logutil.BgLogger().Warn("run add index job failed, convert job to rollback", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand Down Expand Up @@ -416,13 +417,18 @@ func (m *Manager) removeHandlingTask(id int64) {
}

func (m *Manager) logErr(err error) {
logutil.Logger(m.logCtx).Error("task manager error", zap.Error(err), zap.Stack("stack"))
logutil.Logger(m.logCtx).Error("task manager met error", zap.Error(err), zap.Stack("stack"))
}

func (m *Manager) logErrAndPersist(err error, taskID int64) {
m.logErr(err)
// TODO: use interface if each business to retry
if common.IsRetryableError(err) || isRetryableError(err) {
return
}
err1 := m.taskTable.UpdateErrorToSubtask(m.id, taskID, err)
if err1 != nil {
logutil.Logger(m.logCtx).Error("update to subtask failed", zap.Error(err1), zap.Stack("stack"))
}
logutil.Logger(m.logCtx).Error("update error to subtask", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack"))
}
7 changes: 5 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,13 @@ func (s *BaseScheduler) onError(err error) {
return
}
err = errors.Trace(err)
logutil.Logger(s.logCtx).Error("onError", zap.Error(err))
logutil.Logger(s.logCtx).Error("onError", zap.Error(err), zap.Stack("stack"))
s.mu.Lock()
defer s.mu.Unlock()

if s.mu.err == nil {
s.mu.err = err
logutil.Logger(s.logCtx).Error("scheduler error", zap.Error(err))
logutil.Logger(s.logCtx).Error("scheduler met first error", zap.Error(err))
}

if s.mu.runtimeCancel != nil {
Expand Down Expand Up @@ -620,5 +620,8 @@ func (s *BaseScheduler) updateErrorToSubtask(ctx context.Context, taskID int64,
return true, s.taskTable.UpdateErrorToSubtask(s.id, taskID, err)
},
)
if err1 == nil {
logger.Warn("update error to subtask success", zap.Error(err))
}
return err1
}

0 comments on commit 2b18f64

Please sign in to comment.