diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index 52163ab8faef6..5ea47621e708f 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -303,7 +303,7 @@ func (sm *Manager) startScheduler(basicTask *proto.Task, reservedExecID string) sm.addScheduler(task.ID, scheduler) sm.slotMgr.reserve(basicTask, reservedExecID) // Using the pool with block, so it wouldn't return an error. - sm.schedulerWG.Go(func() { + sm.schedulerWG.RunWithLog(func() { defer func() { scheduler.Close() sm.delScheduler(task.ID) diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 076bb2c9dcece..c01e71e47f884 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -240,7 +240,7 @@ func (m *Manager) handleExecutableTasks(tasks []*proto.Task) { m.addHandlingTask(task.ID) m.slotManager.alloc(task) t := task - m.executorWG.Go(func() { + m.executorWG.RunWithLog(func() { defer m.slotManager.free(t.ID) m.handleExecutableTask(t) m.removeHandlingTask(t.ID) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index f23a2425fb025..1fdb4b0c487a0 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -327,7 +327,7 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. var wg util.WaitGroupWrapper checkCtx, checkCancel := context.WithCancel(ctx) - wg.Go(func() { + wg.RunWithLog(func() { e.checkBalanceSubtask(checkCtx) }) defer func() { diff --git a/pkg/util/wait_group_wrapper.go b/pkg/util/wait_group_wrapper.go index d9428004faa05..ae51b5c21ec92 100644 --- a/pkg/util/wait_group_wrapper.go +++ b/pkg/util/wait_group_wrapper.go @@ -158,8 +158,8 @@ func (w *WaitGroupWrapper) Run(exec func()) { }() } -// Go works like Run, but it also logs on panic. -func (w *WaitGroupWrapper) Go(exec func()) { +// RunWithLog works like Run, but it also logs on panic. +func (w *WaitGroupWrapper) RunWithLog(exec func()) { w.Add(1) go func() { defer w.Done() diff --git a/pkg/util/wait_group_wrapper_test.go b/pkg/util/wait_group_wrapper_test.go index 7c18a437fbc70..021a74d4a388d 100644 --- a/pkg/util/wait_group_wrapper_test.go +++ b/pkg/util/wait_group_wrapper_test.go @@ -97,7 +97,7 @@ func TestWaitGroupWrapperCheck(t *testing.T) { func TestWaitGroupWrapperGo(t *testing.T) { file, fileName := prepareStdoutLogger(t) var wg WaitGroupWrapper - wg.Go(func() { + wg.RunWithLog(func() { middleF() }) wg.Wait()