Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: maintain managed nodes separately #49623

Merged
merged 12 commits into from
Dec 22, 2023
17 changes: 6 additions & 11 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
ctx context.Context,
taskHandle scheduler.TaskHandle,
task *proto.Task,
serverInfo []*infosync.ServerInfo,
execIDs []string,
nextStep proto.Step,
) (taskMeta [][]byte, err error) {
logger := logutil.BgLogger().With(
Expand All @@ -96,7 +95,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(serverInfo))
return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
case StepMergeSort:
res, err := generateMergePlan(taskHandle, task, logger)
if err != nil {
Expand Down Expand Up @@ -181,12 +180,8 @@ func (*BackfillingSchedulerExt) OnDone(_ context.Context, _ scheduler.TaskHandle
}

// GetEligibleInstances implements scheduler.Extension interface.
func (*BackfillingSchedulerExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, bool, error) {
serverInfos, err := scheduler.GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, true, err
}
return serverInfos, true, nil
func (*BackfillingSchedulerExt) GetEligibleInstances(_ context.Context, _ *proto.Task) ([]string, error) {
return nil, nil
}

// IsRetryableErr implements scheduler.Extension.IsRetryableErr interface.
Expand All @@ -201,10 +196,10 @@ type LitBackfillScheduler struct {
}

func newLitBackfillScheduler(ctx context.Context, d *ddl, taskMgr scheduler.TaskManager,
serverID string, task *proto.Task) scheduler.Scheduler {
nodeMgr *scheduler.NodeManager, task *proto.Task) scheduler.Scheduler {
sch := LitBackfillScheduler{
d: d,
BaseScheduler: scheduler.NewBaseScheduler(ctx, taskMgr, serverID, task),
BaseScheduler: scheduler.NewBaseScheduler(ctx, taskMgr, nodeMgr, task),
}
return &sch
}
Expand Down
24 changes: 11 additions & 13 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
// 1.1 OnNextSubtasksBatch
task.Step = sch.GetNextStep(task)
require.Equal(t, ddl.StepReadIndex, task.Step)
serverInfos, _, err := sch.GetEligibleInstances(context.Background(), task)
require.NoError(t, err)
metas, err := sch.OnNextSubtasksBatch(context.Background(), nil, task, serverInfos, task.Step)
execIDs := []string{":4000"}
metas, err := sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
for i, par := range tblInfo.Partition.Definitions {
Expand All @@ -84,7 +83,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(task)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, serverInfos, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Len(t, metas, 0)

Expand All @@ -96,7 +95,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
// 2.1 empty table
tk.MustExec("create table t1(id int primary key, v int)")
task = createAddIndexTask(t, dom, "test", "t1", proto.Backfill, false)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, serverInfos, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
// 2.2 non empty table.
Expand All @@ -108,15 +107,15 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task = createAddIndexTask(t, dom, "test", "t2", proto.Backfill, false)
// 2.2.1 stepInit
task.Step = sch.GetNextStep(task)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, serverInfos, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 1, len(metas))
require.Equal(t, ddl.StepReadIndex, task.Step)
// 2.2.2 StepReadIndex
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(task)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, serverInfos, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
}
Expand Down Expand Up @@ -173,11 +172,10 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, task.Meta)
require.NoError(t, err)
task.ID = taskID
serverInfos, _, err := sch.GetEligibleInstances(context.Background(), task)
require.NoError(t, err)
execIDs := []string{":4000"}

// 1. to read-index stage
subtaskMetas, err := sch.OnNextSubtasksBatch(ctx, sch, task, serverInfos, sch.GetNextStep(task))
subtaskMetas, err := sch.OnNextSubtasksBatch(ctx, sch, task, execIDs, sch.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 1)
task.Step = ext.GetNextStep(task)
Expand Down Expand Up @@ -217,7 +215,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort"))
})
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, serverInfos, ext.GetNextStep(task))
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, execIDs, ext.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 1)
task.Step = ext.GetNextStep(task)
Expand Down Expand Up @@ -256,13 +254,13 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockWriteIngest"))
})
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, serverInfos, ext.GetNextStep(task))
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, execIDs, ext.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 1)
task.Step = ext.GetNextStep(task)
require.Equal(t, ddl.StepWriteAndIngest, task.Step)
// 4. to done stage.
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, serverInfos, ext.GetNextStep(task))
subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, sch, task, execIDs, ext.GetNextStep(task))
require.NoError(t, err)
require.Len(t, subtaskMetas, 0)
task.Step = ext.GetNextStep(task)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
)

scheduler.RegisterSchedulerFactory(proto.Backfill,
func(ctx context.Context, taskMgr scheduler.TaskManager, serverID string, task *proto.Task) scheduler.Scheduler {
return newLitBackfillScheduler(ctx, d, taskMgr, serverID, task)
func(ctx context.Context, taskMgr scheduler.TaskManager, nodeMgr *scheduler.NodeManager, task *proto.Task) scheduler.Scheduler {
return newLitBackfillScheduler(ctx, d, taskMgr, nodeMgr, task)
})
scheduler.RegisterSchedulerCleanUpFactory(proto.Backfill, newBackfillCleanUpS3)
// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
Expand Down
28 changes: 14 additions & 14 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "scheduler",
srcs = [
"interface.go",
"nodes.go",
"scheduler.go",
"scheduler_manager.go",
"slots.go",
Expand All @@ -12,6 +13,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/scheduler",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/log",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
Expand Down Expand Up @@ -39,6 +41,7 @@ go_test(
timeout = "short",
srcs = [
"main_test.go",
"nodes_test.go",
"rebalance_test.go",
"scheduler_manager_test.go",
"scheduler_test.go",
Expand All @@ -47,7 +50,7 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 23,
shard_count = 24,
deps = [
"//pkg/config",
"//pkg/disttask/framework/mock",
Expand Down
13 changes: 7 additions & 6 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/syncutil"
)
Expand All @@ -35,7 +34,7 @@ type TaskManager interface {
UpdateTaskAndAddSubTasks(ctx context.Context, task *proto.Task, subtasks []*proto.Subtask, prevState proto.TaskState) (bool, error)
GCSubtasks(ctx context.Context) error
GetAllNodes(ctx context.Context) ([]string, error)
CleanUpMeta(ctx context.Context, nodes []string) error
DeleteDeadNodes(ctx context.Context, nodes []string) error
TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
CancelTask(ctx context.Context, taskID int64) error
// FailTask updates task state to Failed and updates task error.
Expand Down Expand Up @@ -95,7 +94,7 @@ type Extension interface {
// 1. task is pending and entering it's first step.
// 2. subtasks scheduled has all finished with no error.
// when next step is StepDone, it should return nil, nil.
OnNextSubtasksBatch(ctx context.Context, h TaskHandle, task *proto.Task, serverInfo []*infosync.ServerInfo, step proto.Step) (subtaskMetas [][]byte, err error)
OnNextSubtasksBatch(ctx context.Context, h TaskHandle, task *proto.Task, execIDs []string, step proto.Step) (subtaskMetas [][]byte, err error)

// OnDone is called when task is done, either finished successfully or failed
// with error.
Expand All @@ -105,8 +104,10 @@ type Extension interface {

// GetEligibleInstances is used to get the eligible instances for the task.
// on certain condition we may want to use some instances to do the task, such as instances with more disk.
// The bool return value indicates whether filter instances by role.
GetEligibleInstances(ctx context.Context, task *proto.Task) ([]*infosync.ServerInfo, bool, error)
// if returned instances is empty, it means all instances are eligible.
// TODO: run import from server disk using framework makes this logic complicated,
// the instance might not be managed by framework.
GetEligibleInstances(ctx context.Context, task *proto.Task) ([]string, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not now, server disk import requires this


// IsRetryableErr is used to check whether the error occurred in scheduler is retryable.
IsRetryableErr(err error) bool
Expand All @@ -118,7 +119,7 @@ type Extension interface {
}

// schedulerFactoryFn is used to create a scheduler.
type schedulerFactoryFn func(ctx context.Context, taskMgr TaskManager, serverID string, task *proto.Task) Scheduler
type schedulerFactoryFn func(ctx context.Context, taskMgr TaskManager, nodeMgr *NodeManager, task *proto.Task) Scheduler

var schedulerFactoryMap = struct {
syncutil.RWMutex
Expand Down
25 changes: 19 additions & 6 deletions pkg/disttask/framework/scheduler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package scheduler

import (
"context"
"testing"

"github.com/pingcap/tidb/pkg/testkit/testsetup"
Expand All @@ -29,24 +30,36 @@ type SchedulerManagerForTest interface {
}

// GetRunningGTaskCnt implements Scheduler.GetRunningGTaskCnt interface.
func (dm *Manager) GetRunningTaskCnt() int {
return dm.getSchedulerCount()
func (sm *Manager) GetRunningTaskCnt() int {
return sm.getSchedulerCount()
}

// DelRunningGTask implements Scheduler.DelRunningGTask interface.
func (dm *Manager) DelRunningTask(id int64) {
dm.delScheduler(id)
func (sm *Manager) DelRunningTask(id int64) {
sm.delScheduler(id)
}

// DoCleanUpRoutine implements Scheduler.DoCleanUpRoutine interface.
func (dm *Manager) DoCleanUpRoutine() {
dm.doCleanUpRoutine()
func (sm *Manager) DoCleanUpRoutine() {
sm.doCleanupTask()
}

func (s *BaseScheduler) OnNextStage() (err error) {
return s.onNextStage()
}

func (s *BaseScheduler) DoBalanceSubtasks(eligibleNodes []string) error {
return s.doBalanceSubtasks(eligibleNodes)
}

func NewNodeManager() *NodeManager {
okJiang marked this conversation as resolved.
Show resolved Hide resolved
return newNodeManager()
}

func (nm *NodeManager) RefreshManagedNodes(ctx context.Context, taskMgr TaskManager) {
okJiang marked this conversation as resolved.
Show resolved Hide resolved
nm.refreshManagedNodes(ctx, taskMgr)
}

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/scheduler/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/domain/infosync",
"@org_uber_go_mock//gomock",
],
)
12 changes: 5 additions & 7 deletions pkg/disttask/framework/scheduler/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading