Skip to content

Commit

Permalink
dist/ddl: add subtask metrics (#47175)
Browse files Browse the repository at this point in the history
close #47017
  • Loading branch information
okJiang authored Sep 25, 2023
1 parent dd70123 commit 516542b
Show file tree
Hide file tree
Showing 13 changed files with 1,026 additions and 72 deletions.
34 changes: 27 additions & 7 deletions disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ type Subtask struct {
Summary string
}

// IsFinished checks if the subtask is finished.
func (t *Subtask) IsFinished() bool {
return t.State == TaskStateSucceed || t.State == TaskStateReverted || t.State == TaskStateCanceled ||
t.State == TaskStateFailed || t.State == TaskStateRevertFailed
}

// NewSubtask create a new subtask.
func NewSubtask(step int64, taskID int64, tp, schedulerID string, meta []byte) *Subtask {
return &Subtask{
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//util/logutil",
Expand Down
3 changes: 2 additions & 1 deletion disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type TaskTable interface {
GetGlobalTasksInStates(states ...interface{}) (task []*proto.Task, err error)
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)

GetSubtaskInStates(tidbID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
GetSubtasksInStates(tidbID string, taskID int64, step int64, states ...interface{}) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
StartManager(tidbID string, role string) error
StartSubtask(subtaskID int64) error
UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error
Expand Down
76 changes: 55 additions & 21 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -165,14 +166,25 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
wg.Wait()
}()

subtasks, err := s.taskTable.GetSubtasksInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
if err != nil {
s.onError(err)
return s.getError()
}
for _, subtask := range subtasks {
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
}

for {
// check if any error occurs.
if err := s.getError(); err != nil {
break
}
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending)

subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
if err != nil {
logutil.Logger(s.logCtx).Warn("GetSubtaskInStates meets error", zap.Error(err))
logutil.Logger(s.logCtx).Warn("GetFirstSubtaskInStates meets error", zap.Error(err))
continue
}
if subtask == nil {
Expand All @@ -187,11 +199,13 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
}
continue
}
s.startSubtask(subtask.ID)

s.startSubtaskAndUpdateState(subtask)
if err := s.getError(); err != nil {
logutil.Logger(s.logCtx).Warn("startSubtask meets error", zap.Error(err))
logutil.Logger(s.logCtx).Warn("startSubtaskAndUpdateState meets error", zap.Error(err))
continue
}

failpoint.Inject("mockCleanScheduler", func() {
v, ok := testContexts.Load(s.id)
if ok {
Expand All @@ -216,9 +230,9 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.Subtas
if err != nil {
s.onError(err)
if errors.Cause(err) == context.Canceled {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, s.getError())
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError())
}
s.markErrorHandled()
return
Expand Down Expand Up @@ -293,16 +307,14 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, scheduler execute
})
if err := s.getError(); err != nil {
if errors.Cause(err) == context.Canceled {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil)
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError())
}
s.markErrorHandled()
return
}
if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil {
s.onError(err)
}
s.finishSubtaskAndUpdateState(subtask)
failpoint.Inject("syncAfterSubtaskFinish", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
Expand All @@ -320,7 +332,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {

// We should cancel all subtasks before rolling back
for {
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -330,7 +342,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
break
}

s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil)
if err = s.getError(); err != nil {
return err
}
Expand All @@ -341,7 +353,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
s.onError(err)
return s.getError()
}
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -350,17 +362,17 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
logutil.BgLogger().Warn("scheduler rollback a step, but no subtask in revert_pending state", zap.Any("step", task.Step))
return nil
}
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverting, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverting, nil)
if err := s.getError(); err != nil {
return err
}

err = executor.Rollback(rollbackCtx)
if err != nil {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateRevertFailed, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateRevertFailed, nil)
s.onError(err)
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverted, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverted, nil)
}
return s.getError()
}
Expand Down Expand Up @@ -446,16 +458,38 @@ func (s *BaseScheduler) resetError() {
s.mu.handled = false
}

func (s *BaseScheduler) startSubtask(id int64) {
err := s.taskTable.StartSubtask(id)
func (s *BaseScheduler) startSubtaskAndUpdateState(subtask *proto.Subtask) {
metrics.DecDistTaskSubTaskCnt(subtask)
metrics.EndDistTaskSubTask(subtask)
err := s.taskTable.StartSubtask(subtask.ID)
if err != nil {
s.onError(err)
}
subtask.State = proto.TaskStateRunning
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
}

func (s *BaseScheduler) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr)
func (s *BaseScheduler) updateSubtaskStateAndError(subtask *proto.Subtask, state string, subTaskErr error) {
metrics.DecDistTaskSubTaskCnt(subtask)
metrics.EndDistTaskSubTask(subtask)
err := s.taskTable.UpdateSubtaskStateAndError(subtask.ID, state, subTaskErr)
if err != nil {
s.onError(err)
}
subtask.State = state
metrics.IncDistTaskSubTaskCnt(subtask)
if !subtask.IsFinished() {
metrics.StartDistTaskSubTask(subtask)
}
}

func (s *BaseScheduler) finishSubtaskAndUpdateState(subtask *proto.Subtask) {
metrics.DecDistTaskSubTaskCnt(subtask)
metrics.EndDistTaskSubTask(subtask)
if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil {
s.onError(err)
}
subtask.State = proto.TaskStateSucceed
metrics.IncDistTaskSubTaskCnt(subtask)
}
Loading

0 comments on commit 516542b

Please sign in to comment.