Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49971
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
D3Hunter authored and ti-chi-bot committed Jan 2, 2024
1 parent bf303b5 commit c6b9828
Show file tree
Hide file tree
Showing 18 changed files with 2,444 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pkg/disttask/framework/framework_pause_and_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
mgr.PrintSubtaskInfo(ctx, taskID)
cnt, err := mgr.GetSubtaskInStatesCnt(ctx, taskID, state)
cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo)
require.NoError(t, err)
historySubTasksCnt, err := storage.GetSubtasksFromHistoryByTaskIDForTest(ctx, mgr, taskID)
require.NoError(t, err)
require.Equal(t, expectedCnt, cnt+int64(historySubTasksCnt))
require.Equal(t, expectedCnt, cntByStates[state]+int64(historySubTasksCnt))
}

func TestFrameworkPauseAndResume(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_test(
":handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
Expand Down
15 changes: 15 additions & 0 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/stretchr/testify/require"
Expand All @@ -46,9 +47,23 @@ func TestHandle(t *testing.T) {
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

<<<<<<< HEAD
// no dispatcher registered
err := handle.SubmitAndRunGlobalTask(ctx, "1", proto.TaskTypeExample, 2, []byte("byte"))
require.Error(t, err)
=======
testutil.WaitNodeRegistered(ctx, t)

// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, []byte("byte"))
require.NoError(t, err)
waitedTask, err := handle.WaitTask(ctx, task.ID, func(task *proto.Task) bool {
return task.IsDone()
})
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, waitedTask.State)
require.ErrorContains(t, waitedTask.Error, "unknown task type")
>>>>>>> 99f0349bfb6 (disttask: fix failed step is taken as success (#49971))

task, err := mgr.GetGlobalTaskByID(ctx, 1)
require.NoError(t, err)
Expand Down
216 changes: 216 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
":planner",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/testkit",
"@com_github_ngaut_pools//:pools",
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand All @@ -45,7 +46,7 @@ func TestPlanner(t *testing.T) {
defer pool.Close()
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

testutil.WaitNodeRegistered(ctx, t)
p := &planner.Planner{}
pCtx := planner.PlanCtx{
Ctx: ctx,
Expand Down
13 changes: 13 additions & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,27 @@ go_test(
name = "scheduler_test",
timeout = "short",
srcs = [
<<<<<<< HEAD
"manager_test.go",
"register_test.go",
=======
"main_test.go",
"nodes_test.go",
"rebalance_test.go",
"scheduler_manager_test.go",
"scheduler_nokit_test.go",
>>>>>>> 99f0349bfb6 (disttask: fix failed step is taken as success (#49971))
"scheduler_test.go",
],
embed = [":scheduler"],
flaky = True,
<<<<<<< HEAD
race = "on",
shard_count = 8,
=======
race = "off",
shard_count = 26,
>>>>>>> 99f0349bfb6 (disttask: fix failed step is taken as success (#49971))
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
55 changes: 55 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,65 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute"
)

<<<<<<< HEAD
// TaskTable defines the interface to access task table.
type TaskTable interface {
GetGlobalTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetGlobalTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
=======
// TaskManager defines the interface to access task table.
type TaskManager interface {
// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
// to make sure lower priority tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task, and only contains
// some fields, see row2TaskBasic.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
UpdateTaskAndAddSubTasks(ctx context.Context, task *proto.Task, subtasks []*proto.Subtask, prevState proto.TaskState) (bool, error)
GCSubtasks(ctx context.Context) error
GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
DeleteDeadNodes(ctx context.Context, nodes []string) error
TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
CancelTask(ctx context.Context, taskID int64) error
// FailTask updates task state to Failed and updates task error.
FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
PauseTask(ctx context.Context, taskKey string) (bool, error)
// SwitchTaskStep switches the task to the next step and add subtasks in one
// transaction. It will change task state too if we're switch from InitStep to
// next step.
SwitchTaskStep(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
// SwitchTaskStepInBatch similar to SwitchTaskStep, but it will insert subtasks
// in batch, and task step change will be in a separate transaction.
// Note: subtasks of this step must be stable, i.e. count, order and content
// should be the same on each try, else the subtasks inserted might be messed up.
// And each subtask of this step must be different, to handle the network
// partition or owner change.
SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// GetUsedSlotsOnNodes returns the used slots on nodes that have subtask scheduled.
// subtasks of each task on one node is only accounted once as we don't support
// running them concurrently.
// we only consider pending/running subtasks, subtasks related to revert are
// not considered.
GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error)
// GetSubtaskCntGroupByStates returns the count of subtasks of some step group by state.
GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)
ResumeSubtasks(ctx context.Context, taskID int64) error
CollectSubTaskError(ctx context.Context, taskID int64) ([]error, error)
TransferSubTasks2History(ctx context.Context, taskID int64) error
UpdateSubtasksExecIDs(ctx context.Context, taskID int64, subtasks []*proto.Subtask) error
// GetManagedNodes returns the nodes managed by dist framework and can be used
// to execute tasks. If there are any nodes with background role, we use them,
// else we use nodes without role.
// returned nodes are sorted by node id(host:port).
GetManagedNodes(ctx context.Context) ([]proto.ManagedNode, error)
GetTaskExecutorIDsByTaskID(ctx context.Context, taskID int64) ([]string, error)
GetSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.TaskState) ([]*proto.Subtask, error)
GetSubtasksByExecIdsAndStepAndState(ctx context.Context, tidbIDs []string, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)
GetTaskExecutorIDsByTaskIDAndStep(ctx context.Context, taskID int64, step proto.Step) ([]string, error)
>>>>>>> 99f0349bfb6 (disttask: fix failed step is taken as success (#49971))

GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...interface{}) (*proto.Subtask, error)
Expand Down
Loading

0 comments on commit c6b9828

Please sign in to comment.