Skip to content

Commit

Permalink
disttask: insert subtasks in batch (#49282)
Browse files Browse the repository at this point in the history
close #49281
  • Loading branch information
D3Hunter authored Dec 14, 2023
1 parent 1cfb3b9 commit 5519033
Show file tree
Hide file tree
Showing 21 changed files with 750 additions and 230 deletions.
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
// update task/subtask, and finish subtask, so we can go to next stage
subtasks := make([]*proto.Subtask, 0, len(subtaskMetas))
for _, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m))
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m, 0))
}
_, err = mgr.UpdateTaskAndAddSubTasks(ctx, task, subtasks, proto.TaskStatePending)
require.NoError(t, err)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
// update meta, same as import into.
subtasks = make([]*proto.Subtask, 0, len(subtaskMetas))
for _, m := range subtaskMetas {
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m))
subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", 1, m, 0))
}
_, err = mgr.UpdateTaskAndAddSubTasks(ctx, task, subtasks, proto.TaskStatePending)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ const (
state_update_time bigint,
end_time TIMESTAMP,
meta longblob,
ordinal int,
error BLOB,
summary json,
key idx_task_key(task_key),
key idx_exec_id(exec_id)
key idx_exec_id(exec_id),
unique uk_task_key_step_ordinal(task_key, step, ordinal)
)`
// BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableSQL = `create table tidb_background_subtask_history (
Expand All @@ -86,6 +88,7 @@ const (
state_update_time bigint,
end_time TIMESTAMP,
meta longblob,
ordinal int,
error BLOB,
summary json,
key idx_task_key(task_key),
Expand Down
3 changes: 1 addition & 2 deletions pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go_test(
name = "framework_test",
timeout = "short",
srcs = [
"framework_dynamic_dispatch_test.go",
"framework_err_handling_test.go",
"framework_ha_test.go",
"framework_pause_and_resume_test.go",
Expand All @@ -13,7 +12,7 @@ go_test(
],
flaky = True,
race = "off",
shard_count = 32,
shard_count = 29,
deps = [
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
Expand Down
51 changes: 0 additions & 51 deletions pkg/disttask/framework/framework_dynamic_dispatch_test.go

This file was deleted.

11 changes: 0 additions & 11 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,6 @@ func TestOwnerChange(t *testing.T) {
distContext.Close()
}

func TestFrameworkCancelThenSubmitSubTask(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdate", "return()"))
testutil.DispatchTaskAndCheckState(ctx, t, "😊", testContext, proto.TaskStateReverted)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdate"))
distContext.Close()
}

func TestTaskExecutorDownBasic(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 4)
defer ctrl.Finish()
Expand Down
42 changes: 42 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (t *Task) IsDone() bool {
t.State == TaskStateFailed
}

var (
// EmptyMeta is the empty meta of subtask.
EmptyMeta = []byte("{}")
)

// Compare compares two tasks by task order.
func (t *Task) Compare(other *Task) int {
if t.Priority != other.Priority {
Expand Down Expand Up @@ -197,8 +202,13 @@ type Subtask struct {
// it can be used as subtask end time if the subtask is finished.
// it's 0 if it hasn't started yet.
UpdateTime time.Time
Meta []byte
Summary string
// Meta is the metadata of subtask, should not be nil.
// meta of different subtasks of same step must be different too.
Meta []byte
Summary string
// Ordinal is the ordinal of subtask, should be unique for some task and step.
// starts from 1, for reverting subtask, it's NULL in database.
Ordinal int
}

func (t *Subtask) String() string {
Expand All @@ -213,15 +223,17 @@ func (t *Subtask) IsFinished() bool {
}

// NewSubtask create a new subtask.
func NewSubtask(step Step, taskID int64, tp TaskType, execID string, concurrency int, meta []byte) *Subtask {
return &Subtask{
func NewSubtask(step Step, taskID int64, tp TaskType, execID string, concurrency int, meta []byte, ordinal int) *Subtask {
s := &Subtask{
Step: step,
Type: tp,
TaskID: taskID,
ExecID: execID,
Concurrency: concurrency,
Meta: meta,
Ordinal: ordinal,
}
return s
}

// MinimalTask is the minimal task of distribute framework.
Expand Down
6 changes: 5 additions & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/metrics",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/cpu",
"//pkg/util/disttask",
"//pkg/util/intest",
Expand All @@ -44,8 +47,9 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 22,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler/mock",
Expand Down
13 changes: 13 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ type TaskManager interface {
// FailTask updates task state to Failed and updates task error.
FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
PauseTask(ctx context.Context, taskKey string) (bool, error)
// SwitchTaskStep switches the task to the next step and add subtasks in one
// transaction. It will change task state too if we're switch from InitStep to
// next step.
SwitchTaskStep(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
// SwitchTaskStepInBatch similar to SwitchTaskStep, but it will insert subtasks
// in batch, and task step change will be in a separate transaction.
// Note: subtasks of this step must be stable, i.e. count, order and content
// should be the same on each try, else the subtasks inserted might be messed up.
// And each subtask of this step must be different, to handle the network
// partition or owner change.
SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// GetUsedSlotsOnNodes returns the used slots on nodes that have subtask scheduled.
// subtasks of each task on one node is only accounted once as we don't support
// running them concurrently.
Expand Down
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (dm *Manager) DoCleanUpRoutine() {
dm.doCleanUpRoutine()
}

func (s *BaseScheduler) OnNextStage() (err error) {
return s.onNextStage()
}

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

Expand Down
Loading

0 comments on commit 5519033

Please sign in to comment.