diff --git a/pkg/ddl/backfilling_dist_scheduler_test.go b/pkg/ddl/backfilling_dist_scheduler_test.go index 12384bda39de5..57f91e6b5af3e 100644 --- a/pkg/ddl/backfilling_dist_scheduler_test.go +++ b/pkg/ddl/backfilling_dist_scheduler_test.go @@ -144,8 +144,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) { ctx = util.WithInternalSourceType(ctx, "handle") mgr := storage.NewTaskManager(pool) storage.SetTaskManager(mgr) - schManager, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") - require.NoError(t, err) + schManager := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") tk.MustExec("use test") tk.MustExec("create table t1(id bigint auto_random primary key)") diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index 0c678390d6283..ffb84e2d9f131 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -21,8 +21,6 @@ go_library( "//pkg/domain/infosync", "//pkg/kv", "//pkg/metrics", - "//pkg/resourcemanager/pool/spool", - "//pkg/resourcemanager/util", "//pkg/sessionctx", "//pkg/util", "//pkg/util/backoff", diff --git a/pkg/disttask/framework/scheduler/balancer_test.go b/pkg/disttask/framework/scheduler/balancer_test.go index 06f87c223ef73..49669ffaba142 100644 --- a/pkg/disttask/framework/scheduler/balancer_test.go +++ b/pkg/disttask/framework/scheduler/balancer_test.go @@ -368,8 +368,7 @@ func TestBalanceMultipleTasks(t *testing.T) { } ctx := context.Background() - manager, err := NewManager(ctx, mockTaskMgr, "1") - require.NoError(t, err) + manager := NewManager(ctx, mockTaskMgr, "1") manager.slotMgr.updateCapacity(16) manager.nodeMgr.managedNodes.Store(&[]string{"tidb1", "tidb2", "tidb3"}) b := newBalancer(Param{ diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index eb823e9d51208..5ea47621e708f 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -24,8 +24,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/resourcemanager/pool/spool" - "github.com/pingcap/tidb/pkg/resourcemanager/util" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/syncutil" @@ -103,7 +101,7 @@ type Manager struct { cancel context.CancelFunc taskMgr TaskManager wg tidbutil.WaitGroupWrapper - gPool *spool.Pool + schedulerWG tidbutil.WaitGroupWrapper slotMgr *SlotManager nodeMgr *NodeManager balancer *balancer @@ -122,18 +120,13 @@ type Manager struct { } // NewManager creates a scheduler struct. -func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) (*Manager, error) { +func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager { schedulerManager := &Manager{ taskMgr: taskMgr, serverID: serverID, slotMgr: newSlotManager(), nodeMgr: newNodeManager(), } - gPool, err := spool.NewPool("scheduler_pool", int32(proto.MaxConcurrentTask), util.DistTask, spool.WithBlocking(true)) - if err != nil { - return nil, err - } - schedulerManager.gPool = gPool schedulerManager.ctx, schedulerManager.cancel = context.WithCancel(ctx) schedulerManager.mu.schedulerMap = make(map[int64]Scheduler) schedulerManager.finishCh = make(chan struct{}, proto.MaxConcurrentTask) @@ -143,7 +136,7 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) (*Man slotMgr: schedulerManager.slotMgr, }) - return schedulerManager, nil + return schedulerManager } // Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers. @@ -172,7 +165,7 @@ func (sm *Manager) Start() { // Stop the schedulerManager. func (sm *Manager) Stop() { sm.cancel() - sm.gPool.ReleaseAndWait() + sm.schedulerWG.Wait() sm.wg.Wait() sm.clearSchedulers() sm.initialized = false @@ -310,7 +303,7 @@ func (sm *Manager) startScheduler(basicTask *proto.Task, reservedExecID string) sm.addScheduler(task.ID, scheduler) sm.slotMgr.reserve(basicTask, reservedExecID) // Using the pool with block, so it wouldn't return an error. - _ = sm.gPool.Run(func() { + sm.schedulerWG.RunWithLog(func() { defer func() { scheduler.Close() sm.delScheduler(task.ID) diff --git a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go index 2b2558b24dcb4..80a1cd8f12014 100644 --- a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go @@ -141,8 +141,7 @@ func TestDispatcherOnNextStage(t *testing.T) { func TestManagerSchedulersOrdered(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mgr, err := NewManager(context.Background(), nil, "1") - require.NoError(t, err) + mgr := NewManager(context.Background(), nil, "1") for i := 1; i <= 5; i++ { task := &proto.Task{ ID: int64(i * 10), @@ -227,8 +226,7 @@ func TestSchedulerCleanupTask(t *testing.T) { defer ctrl.Finish() taskMgr := mock.NewMockTaskManager(ctrl) ctx := context.Background() - mgr, err := NewManager(ctx, taskMgr, "1") - require.NoError(t, err) + mgr := NewManager(ctx, taskMgr, "1") // normal tasks := []*proto.Task{ diff --git a/pkg/disttask/framework/scheduler/scheduler_test.go b/pkg/disttask/framework/scheduler/scheduler_test.go index c263c2bbb3775..a2b434db05d5b 100644 --- a/pkg/disttask/framework/scheduler/scheduler_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_test.go @@ -116,8 +116,7 @@ func MockSchedulerManager(t *testing.T, ctrl *gomock.Controller, pool *pools.Res ctx := context.WithValue(context.Background(), "etcd", true) mgr := storage.NewTaskManager(pool) storage.SetTaskManager(mgr) - sch, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") - require.NoError(t, err) + sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") scheduler.RegisterSchedulerFactory(proto.TaskTypeExample, func(ctx context.Context, task *proto.Task, param scheduler.Param) scheduler.Scheduler { mockScheduler := sch.MockScheduler(task) diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 84de85776bd74..4e0c25b32d211 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -22,8 +22,6 @@ go_library( "//pkg/disttask/framework/taskexecutor/execute", "//pkg/domain/infosync", "//pkg/metrics", - "//pkg/resourcemanager/pool/spool", - "//pkg/resourcemanager/util", "//pkg/util", "//pkg/util/backoff", "//pkg/util/cpu", @@ -59,8 +57,6 @@ go_test( "//pkg/disttask/framework/storage", "//pkg/disttask/framework/testutil", "//pkg/kv", - "//pkg/resourcemanager/pool/spool", - "//pkg/resourcemanager/util", "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/util/logutil", diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 5388e5831f1b2..c01e71e47f884 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -27,8 +27,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/resourcemanager/pool/spool" - "github.com/pingcap/tidb/pkg/resourcemanager/util" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/cpu" "github.com/pingcap/tidb/pkg/util/logutil" @@ -37,7 +35,6 @@ import ( ) var ( - executorPoolSize int32 = 4 // same as scheduler checkTime = 300 * time.Millisecond recoverMetaInterval = 90 * time.Second @@ -49,30 +46,10 @@ var ( } ) -// ManagerBuilder is used to build a Manager. -type ManagerBuilder struct { - newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) -} - -// NewManagerBuilder creates a new ManagerBuilder. -func NewManagerBuilder() *ManagerBuilder { - return &ManagerBuilder{ - newPool: func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) { - return spool.NewPool(name, size, component, options...) - }, - } -} - -// setPoolFactory sets the poolFactory to mock the Pool in unit test. -func (b *ManagerBuilder) setPoolFactory(poolFactory func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)) { - b.newPool = poolFactory -} - // Manager monitors the task table and manages the taskExecutors. type Manager struct { - taskTable TaskTable - executorPool Pool - mu struct { + taskTable TaskTable + mu struct { sync.RWMutex // taskID -> CancelCauseFunc. // CancelCauseFunc is used to fast cancel the executor.Run. @@ -81,18 +58,18 @@ type Manager struct { // id, it's the same as server id now, i.e. host:port. id string wg tidbutil.WaitGroupWrapper + executorWG tidbutil.WaitGroupWrapper ctx context.Context cancel context.CancelFunc logger *zap.Logger - newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) slotManager *slotManager totalCPU int totalMem int64 } -// BuildManager builds a Manager. -func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) { +// NewManager creates a new task executor Manager. +func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) { totalMem, err := memory.MemTotal() if err != nil { // should not happen normally, as in main function of tidb-server, we assert @@ -109,7 +86,6 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable id: id, taskTable: taskTable, logger: logutil.BgLogger(), - newPool: b.newPool, slotManager: &slotManager{ taskID2Index: make(map[int64]int), executorTasks: make([]*proto.Task, 0), @@ -121,12 +97,6 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable m.ctx, m.cancel = context.WithCancel(ctx) m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc) - executorPool, err := m.newPool("executor_pool", executorPoolSize, util.DistTask) - if err != nil { - return nil, err - } - m.executorPool = executorPool - return m, nil } @@ -184,7 +154,7 @@ func (m *Manager) Start() error { // Stop stops the Manager. func (m *Manager) Stop() { m.cancel() - m.executorPool.ReleaseAndWait() + m.executorWG.Wait() m.wg.Wait() } @@ -270,18 +240,11 @@ func (m *Manager) handleExecutableTasks(tasks []*proto.Task) { m.addHandlingTask(task.ID) m.slotManager.alloc(task) t := task - err = m.executorPool.Run(func() { + m.executorWG.RunWithLog(func() { defer m.slotManager.free(t.ID) m.handleExecutableTask(t) m.removeHandlingTask(t.ID) }) - // pool closed. - if err != nil { - m.slotManager.free(t.ID) - m.removeHandlingTask(task.ID) - m.logErr(err) - return - } } } diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go index 4c149484cda0d..b156735e51553 100644 --- a/pkg/disttask/framework/taskexecutor/manager_test.go +++ b/pkg/disttask/framework/taskexecutor/manager_test.go @@ -23,29 +23,14 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/resourcemanager/pool/spool" - "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) -func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) { - wg := &sync.WaitGroup{} - return wg, func(f func()) error { - wg.Add(1) - go func() { - defer wg.Done() - f() - }() - return nil - } -} - func TestBuildManager(t *testing.T) { - b := NewManagerBuilder() - m, err := b.BuildManager(context.Background(), "test", nil) + m, err := NewManager(context.Background(), "test", nil) require.NoError(t, err) require.NotNil(t, m) @@ -56,13 +41,13 @@ func TestBuildManager(t *testing.T) { memory.MemTotal = func() (uint64, error) { return 0, errors.New("mock error") } - _, err = b.BuildManager(context.Background(), "test", nil) + _, err = NewManager(context.Background(), "test", nil) require.ErrorContains(t, err, "mock error") memory.MemTotal = func() (uint64, error) { return 0, nil } - _, err = b.BuildManager(context.Background(), "test", nil) + _, err = NewManager(context.Background(), "test", nil) require.ErrorContains(t, err, "invalid cpu or memory") } @@ -70,9 +55,8 @@ func TestManageTask(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - b := NewManagerBuilder() mockTaskTable := mock.NewMockTaskTable(ctrl) - m, err := b.BuildManager(context.Background(), "test", mockTaskTable) + m, err := NewManager(context.Background(), "test", mockTaskTable) require.NoError(t, err) m.addHandlingTask(1) @@ -114,18 +98,13 @@ func TestHandleExecutableTasks(t *testing.T) { defer ctrl.Finish() mockTaskTable := mock.NewMockTaskTable(ctrl) mockInternalExecutor := mock.NewMockTaskExecutor(ctrl) - mockPool := mock.NewMockPool(ctrl) ctx := context.Background() - b := NewManagerBuilder() - b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) { - return mockPool, nil - }) id := "test" taskID := int64(1) task := &proto.Task{ID: taskID, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type"} - m, err := b.BuildManager(ctx, id, mockTaskTable) + m, err := NewManager(ctx, id, mockTaskTable) require.NoError(t, err) // no task @@ -168,17 +147,9 @@ func TestHandleExecutableTasks(t *testing.T) { unfinishedSubtaskStates).Return(false, nil) m.handleExecutableTasks([]*proto.Task{task}) - // pool error - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID, proto.StepOne, - unfinishedSubtaskStates).Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).Return(errors.New("pool error")) - m.handleExecutableTasks([]*proto.Task{task}) - // StepOne succeed - wg, runFn := getPoolRunFn() mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID, proto.StepOne, unfinishedSubtaskStates).Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID).Return(task, nil) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID, proto.StepOne, unfinishedSubtaskStates).Return(true, nil) @@ -202,7 +173,7 @@ func TestHandleExecutableTasks(t *testing.T) { m.handleExecutableTasks([]*proto.Task{task}) - wg.Wait() + m.executorWG.Wait() } func TestManager(t *testing.T) { @@ -210,18 +181,13 @@ func TestManager(t *testing.T) { defer ctrl.Finish() mockTaskTable := mock.NewMockTaskTable(ctrl) mockInternalExecutor := mock.NewMockTaskExecutor(ctrl) - mockPool := mock.NewMockPool(ctrl) - b := NewManagerBuilder() - b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) { - return mockPool, nil - }) RegisterTaskType("type", func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) TaskExecutor { return mockInternalExecutor }) id := "test" - m, err := b.BuildManager(context.Background(), id, mockTaskTable) + m, err := NewManager(context.Background(), id, mockTaskTable) require.NoError(t, err) taskID1 := int64(1) @@ -239,8 +205,6 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - wg, runFn := getPoolRunFn() - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil).AnyTimes() mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). @@ -250,12 +214,10 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). Return(false, nil).AnyTimes() - mockInternalExecutor.EXPECT().Close() // task2 mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil).AnyTimes() mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). @@ -265,18 +227,15 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). Return(false, nil).AnyTimes() - mockInternalExecutor.EXPECT().Close() // task3 mockTaskTable.EXPECT().PauseSubtasks(m.ctx, id, taskID3).Return(nil).AnyTimes() - // for taskExecutor pool - mockPool.EXPECT().ReleaseAndWait().Do(func() { - wg.Wait() - }) - require.NoError(t, m.InitMeta()) require.NoError(t, m.Start()) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return ctrl.Satisfied() + }, 5*time.Second, 100*time.Millisecond) + mockInternalExecutor.EXPECT().Close().Times(2) m.Stop() } @@ -285,18 +244,13 @@ func TestManagerHandleTasks(t *testing.T) { defer ctrl.Finish() mockTaskTable := mock.NewMockTaskTable(ctrl) mockInternalExecutor := mock.NewMockTaskExecutor(ctrl) - mockPool := mock.NewMockPool(ctrl) - b := NewManagerBuilder() - b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) { - return mockPool, nil - }) RegisterTaskType("type", func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) TaskExecutor { return mockInternalExecutor }) id := "test" - m, err := b.BuildManager(context.Background(), id, mockTaskTable) + m, err := NewManager(context.Background(), id, mockTaskTable) require.NoError(t, err) m.slotManager.available = 16 @@ -317,7 +271,6 @@ func TestManagerHandleTasks(t *testing.T) { ch := make(chan error) defer close(ch) - wg, runFn := getPoolRunFn() task1 := &proto.Task{ID: 1, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type", Concurrency: 1} // handle pending tasks @@ -327,7 +280,6 @@ func TestManagerHandleTasks(t *testing.T) { Return([]*proto.Task{task1}, nil) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, task1.ID, proto.StepOne, unfinishedSubtaskStates).Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockTaskTable.EXPECT().GetTaskByID(m.ctx, task1.ID).Return(task1, nil) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, task1.ID, proto.StepOne, @@ -372,7 +324,7 @@ func TestManagerHandleTasks(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) require.False(t, m.isExecutorStarted(task1.ID)) - wg.Wait() + m.executorWG.Wait() } func TestSlotManagerInManager(t *testing.T) { @@ -380,18 +332,13 @@ func TestSlotManagerInManager(t *testing.T) { defer ctrl.Finish() mockTaskTable := mock.NewMockTaskTable(ctrl) mockInternalExecutor := mock.NewMockTaskExecutor(ctrl) - mockPool := mock.NewMockPool(ctrl) - b := NewManagerBuilder() - b.setPoolFactory(func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) { - return mockPool, nil - }) RegisterTaskType("type", func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) TaskExecutor { return mockInternalExecutor }) id := "test" - m, err := b.BuildManager(context.Background(), id, mockTaskTable) + m, err := NewManager(context.Background(), id, mockTaskTable) require.NoError(t, err) m.slotManager.available = 10 @@ -417,7 +364,6 @@ func TestSlotManagerInManager(t *testing.T) { ch := make(chan error) defer close(ch) - wg, runFn := getPoolRunFn() // ******** Test task1 alloc success ******** // 1. task1 alloc success @@ -426,7 +372,6 @@ func TestSlotManagerInManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) @@ -457,7 +402,7 @@ func TestSlotManagerInManager(t *testing.T) { task1.State = proto.TaskStateSucceed mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil) mockInternalExecutor.EXPECT().Close() - wg.Wait() + m.executorWG.Wait() require.Equal(t, 10, m.slotManager.available) require.Equal(t, 0, len(m.slotManager.executorTasks)) require.True(t, ctrl.Satisfied()) @@ -479,7 +424,6 @@ func TestSlotManagerInManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) @@ -525,7 +469,7 @@ func TestSlotManagerInManager(t *testing.T) { // 4. task1 is released, task3 alloc success, start to run ch <- context.Canceled - wg.Wait() + m.executorWG.Wait() require.Equal(t, 10, m.slotManager.available) require.Len(t, m.slotManager.executorTasks, 0) require.True(t, ctrl.Satisfied()) @@ -533,14 +477,12 @@ func TestSlotManagerInManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, unfinishedSubtaskStates). Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) // mock inside handleExecutableTask mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil) @@ -580,7 +522,7 @@ func TestSlotManagerInManager(t *testing.T) { mockInternalExecutor.EXPECT().Close() ch <- nil ch <- nil - wg.Wait() + m.executorWG.Wait() require.Equal(t, 10, m.slotManager.available) require.Equal(t, 0, len(m.slotManager.executorTasks)) require.True(t, ctrl.Satisfied()) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index f23a2425fb025..1fdb4b0c487a0 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -327,7 +327,7 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. var wg util.WaitGroupWrapper checkCtx, checkCancel := context.WithCancel(ctx) - wg.Go(func() { + wg.RunWithLog(func() { e.checkBalanceSubtask(checkCtx) }) defer func() { diff --git a/pkg/disttask/importinto/scheduler_testkit_test.go b/pkg/disttask/importinto/scheduler_testkit_test.go index 7fc90c05633cd..ff2e3a717c4f4 100644 --- a/pkg/disttask/importinto/scheduler_testkit_test.go +++ b/pkg/disttask/importinto/scheduler_testkit_test.go @@ -48,8 +48,7 @@ func TestSchedulerExtLocalSort(t *testing.T) { ctx = util.WithInternalSourceType(ctx, "taskManager") mgr := storage.NewTaskManager(pool) storage.SetTaskManager(mgr) - sch, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") - require.NoError(t, err) + sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") // create job conn := tk.Session().(sqlexec.SQLExecutor) @@ -184,8 +183,7 @@ func TestSchedulerExtGlobalSort(t *testing.T) { ctx = util.WithInternalSourceType(ctx, "taskManager") mgr := storage.NewTaskManager(pool) storage.SetTaskManager(mgr) - sch, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") - require.NoError(t, err) + sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port") // create job conn := tk.Session().(sqlexec.SQLExecutor) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 0b126071da3b6..1a27a531b9da4 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1484,7 +1484,7 @@ func (do *Domain) InitDistTaskLoop() error { } managerCtx, cancel := context.WithCancel(ctx) do.cancelFns = append(do.cancelFns, cancel) - executorManager, err := taskexecutor.NewManagerBuilder().BuildManager(managerCtx, serverID, taskManager) + executorManager, err := taskexecutor.NewManager(managerCtx, serverID, taskManager) if err != nil { return err } @@ -1522,12 +1522,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag if schedulerManager != nil && schedulerManager.Initialized() { return } - var err error - schedulerManager, err = scheduler.NewManager(ctx, taskManager, serverID) - if err != nil { - logutil.BgLogger().Error("failed to create a dist task scheduler manager", zap.Error(err)) - return - } + schedulerManager = scheduler.NewManager(ctx, taskManager, serverID) schedulerManager.Start() } stopSchedulerMgrIfNeeded := func() { diff --git a/pkg/util/wait_group_wrapper.go b/pkg/util/wait_group_wrapper.go index d9428004faa05..ae51b5c21ec92 100644 --- a/pkg/util/wait_group_wrapper.go +++ b/pkg/util/wait_group_wrapper.go @@ -158,8 +158,8 @@ func (w *WaitGroupWrapper) Run(exec func()) { }() } -// Go works like Run, but it also logs on panic. -func (w *WaitGroupWrapper) Go(exec func()) { +// RunWithLog works like Run, but it also logs on panic. +func (w *WaitGroupWrapper) RunWithLog(exec func()) { w.Add(1) go func() { defer w.Done() diff --git a/pkg/util/wait_group_wrapper_test.go b/pkg/util/wait_group_wrapper_test.go index 7c18a437fbc70..021a74d4a388d 100644 --- a/pkg/util/wait_group_wrapper_test.go +++ b/pkg/util/wait_group_wrapper_test.go @@ -97,7 +97,7 @@ func TestWaitGroupWrapperCheck(t *testing.T) { func TestWaitGroupWrapperGo(t *testing.T) { file, fileName := prepareStdoutLogger(t) var wg WaitGroupWrapper - wg.Go(func() { + wg.RunWithLog(func() { middleF() }) wg.Wait()