diff --git a/pkg/disttask/framework/mock/task_executor_mock.go b/pkg/disttask/framework/mock/task_executor_mock.go index 21bddbed6f473..ebe3d58a7a3f0 100644 --- a/pkg/disttask/framework/mock/task_executor_mock.go +++ b/pkg/disttask/framework/mock/task_executor_mock.go @@ -149,6 +149,20 @@ func (mr *MockTaskTableMockRecorder) HasSubtasksInStates(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasSubtasksInStates", reflect.TypeOf((*MockTaskTable)(nil).HasSubtasksInStates), varargs...) } +// InitMeta mocks base method. +func (m *MockTaskTable) InitMeta(arg0 context.Context, arg1, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InitMeta", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// InitMeta indicates an expected call of InitMeta. +func (mr *MockTaskTableMockRecorder) InitMeta(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitMeta", reflect.TypeOf((*MockTaskTable)(nil).InitMeta), arg0, arg1, arg2) +} + // IsTaskExecutorCanceled mocks base method. func (m *MockTaskTable) IsTaskExecutorCanceled(arg0 context.Context, arg1 string, arg2 int64) (bool, error) { m.ctrl.T.Helper() @@ -178,18 +192,18 @@ func (mr *MockTaskTableMockRecorder) PauseSubtasks(arg0, arg1, arg2 any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseSubtasks", reflect.TypeOf((*MockTaskTable)(nil).PauseSubtasks), arg0, arg1, arg2) } -// StartManager mocks base method. -func (m *MockTaskTable) StartManager(arg0 context.Context, arg1, arg2 string) error { +// RecoverMeta mocks base method. +func (m *MockTaskTable) RecoverMeta(arg0 context.Context, arg1, arg2 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StartManager", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "RecoverMeta", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// StartManager indicates an expected call of StartManager. -func (mr *MockTaskTableMockRecorder) StartManager(arg0, arg1, arg2 any) *gomock.Call { +// RecoverMeta indicates an expected call of RecoverMeta. +func (mr *MockTaskTableMockRecorder) RecoverMeta(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartManager", reflect.TypeOf((*MockTaskTable)(nil).StartManager), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecoverMeta", reflect.TypeOf((*MockTaskTable)(nil).RecoverMeta), arg0, arg1, arg2) } // StartSubtask mocks base method. diff --git a/pkg/disttask/framework/scheduler/scheduler_test.go b/pkg/disttask/framework/scheduler/scheduler_test.go index b5ec6e3bf9728..15ca02692ce3b 100644 --- a/pkg/disttask/framework/scheduler/scheduler_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_test.go @@ -284,7 +284,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, } }() - require.NoError(t, mgr.StartManager(ctx, ":4000", "background")) + require.NoError(t, mgr.InitMeta(ctx, ":4000", "background")) // 3s cnt := 60 diff --git a/pkg/disttask/framework/storage/BUILD.bazel b/pkg/disttask/framework/storage/BUILD.bazel index b67a77c734094..a00fc6baeb98c 100644 --- a/pkg/disttask/framework/storage/BUILD.bazel +++ b/pkg/disttask/framework/storage/BUILD.bazel @@ -40,7 +40,7 @@ go_test( embed = [":storage"], flaky = True, race = "on", - shard_count = 16, + shard_count = 17, deps = [ "//pkg/config", "//pkg/disttask/framework/proto", diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index b7e97161c78e9..78b97f690d69b 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -52,7 +52,7 @@ func checkTaskStateStep(t *testing.T, task *proto.Task, state proto.TaskState, s func TestTaskTable(t *testing.T) { _, gm, ctx := testutil.InitTableTest(t) - require.NoError(t, gm.StartManager(ctx, ":4000", "")) + require.NoError(t, gm.InitMeta(ctx, ":4000", "")) _, err := gm.CreateTask(ctx, "key1", "test", 999, []byte("test")) require.ErrorContains(t, err, "task concurrency(999) larger than cpu count") @@ -232,7 +232,7 @@ func TestSwitchTaskStep(t *testing.T) { store, tm, ctx := testutil.InitTableTest(t) tk := testkit.NewTestKit(t, store) - require.NoError(t, tm.StartManager(ctx, ":4000", "")) + require.NoError(t, tm.InitMeta(ctx, ":4000", "")) taskID, err := tm.CreateTask(ctx, "key1", "test", 4, []byte("test")) require.NoError(t, err) task, err := tm.GetTaskByID(ctx, taskID) @@ -282,7 +282,7 @@ func TestSwitchTaskStepInBatch(t *testing.T) { store, tm, ctx := testutil.InitTableTest(t) tk := testkit.NewTestKit(t, store) - require.NoError(t, tm.StartManager(ctx, ":4000", "")) + require.NoError(t, tm.InitMeta(ctx, ":4000", "")) // normal flow prepare := func(taskKey string) (*proto.Task, []*proto.Subtask) { taskID, err := tm.CreateTask(ctx, taskKey, "test", 4, []byte("test")) @@ -357,7 +357,7 @@ func TestSwitchTaskStepInBatch(t *testing.T) { func TestGetTopUnfinishedTasks(t *testing.T) { _, gm, ctx := testutil.InitTableTest(t) - require.NoError(t, gm.StartManager(ctx, ":4000", "")) + require.NoError(t, gm.InitMeta(ctx, ":4000", "")) taskStates := []proto.TaskState{ proto.TaskStateSucceed, proto.TaskStatePending, @@ -440,7 +440,7 @@ func TestGetUsedSlotsOnNodes(t *testing.T) { func TestSubTaskTable(t *testing.T) { _, sm, ctx := testutil.InitTableTest(t) timeBeforeCreate := time.Unix(time.Now().Unix(), 0) - require.NoError(t, sm.StartManager(ctx, ":4000", "")) + require.NoError(t, sm.InitMeta(ctx, ":4000", "")) id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test")) require.NoError(t, err) require.Equal(t, int64(1), id) @@ -689,7 +689,7 @@ func TestSubTaskTable(t *testing.T) { func TestBothTaskAndSubTaskTable(t *testing.T) { _, sm, ctx := testutil.InitTableTest(t) - require.NoError(t, sm.StartManager(ctx, ":4000", "")) + require.NoError(t, sm.InitMeta(ctx, ":4000", "")) id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test")) require.NoError(t, err) require.Equal(t, int64(1), id) @@ -830,16 +830,16 @@ func TestDistFrameworkMeta(t *testing.T) { _, err := storage.GetCPUCountOfManagedNodes(ctx, sm) require.ErrorContains(t, err, "no managed nodes") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)")) - require.NoError(t, sm.StartManager(ctx, ":4000", "background")) + require.NoError(t, sm.InitMeta(ctx, ":4000", "background")) cpuCount, err := storage.GetCPUCountOfManagedNodes(ctx, sm) require.NoError(t, err) require.Equal(t, 0, cpuCount) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)")) - require.NoError(t, sm.StartManager(ctx, ":4000", "background")) + require.NoError(t, sm.InitMeta(ctx, ":4000", "background")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")) - require.NoError(t, sm.StartManager(ctx, ":4001", "")) - require.NoError(t, sm.StartManager(ctx, ":4002", "background")) + require.NoError(t, sm.InitMeta(ctx, ":4001", "")) + require.NoError(t, sm.InitMeta(ctx, ":4002", "background")) nodes, err := sm.GetAllNodes(ctx) require.NoError(t, err) require.Equal(t, []proto.ManagedNode{ @@ -848,23 +848,24 @@ func TestDistFrameworkMeta(t *testing.T) { {ID: ":4002", Role: "background", CPUCount: 8}, }, nodes) - // won't be replaced by below one, but cpu count will be updated require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)")) - require.NoError(t, sm.StartManager(ctx, ":4002", "")) - require.NoError(t, sm.StartManager(ctx, ":4003", "background")) + require.NoError(t, sm.InitMeta(ctx, ":4002", "")) + require.NoError(t, sm.InitMeta(ctx, ":4003", "background")) nodes, err = sm.GetAllNodes(ctx) require.NoError(t, err) require.Equal(t, []proto.ManagedNode{ {ID: ":4000", Role: "background", CPUCount: 100}, {ID: ":4001", Role: "", CPUCount: 8}, - {ID: ":4002", Role: "background", CPUCount: 100}, + {ID: ":4002", Role: "", CPUCount: 100}, {ID: ":4003", Role: "background", CPUCount: 100}, }, nodes) cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm) require.NoError(t, err) require.Equal(t, 100, cpuCount) + require.NoError(t, sm.InitMeta(ctx, ":4002", "background")) + require.NoError(t, sm.DeleteDeadNodes(ctx, []string{":4000"})) nodes, err = sm.GetManagedNodes(ctx) require.NoError(t, err) @@ -889,6 +890,20 @@ func TestDistFrameworkMeta(t *testing.T) { cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm) require.NoError(t, err) require.Equal(t, 8, cpuCount) + + require.NoError(t, sm.RecoverMeta(ctx, ":4002", "background")) + nodes, err = sm.GetManagedNodes(ctx) + require.NoError(t, err) + require.Equal(t, []proto.ManagedNode{ + {ID: ":4002", Role: "background", CPUCount: 100}, + }, nodes) + // should not reset role + require.NoError(t, sm.RecoverMeta(ctx, ":4002", "")) + nodes, err = sm.GetManagedNodes(ctx) + require.NoError(t, err) + require.Equal(t, []proto.ManagedNode{ + {ID: ":4002", Role: "background", CPUCount: 100}, + }, nodes) } func TestSubtaskHistoryTable(t *testing.T) { @@ -959,7 +974,7 @@ func TestSubtaskHistoryTable(t *testing.T) { func TestTaskHistoryTable(t *testing.T) { _, gm, ctx := testutil.InitTableTest(t) - require.NoError(t, gm.StartManager(ctx, ":4000", "")) + require.NoError(t, gm.InitMeta(ctx, ":4000", "")) _, err := gm.CreateTask(ctx, "1", proto.TaskTypeExample, 1, nil) require.NoError(t, err) taskID, err := gm.CreateTask(ctx, "2", proto.TaskTypeExample, 1, nil) @@ -1071,3 +1086,32 @@ func TestTaskNotFound(t *testing.T) { require.Error(t, err, storage.ErrTaskNotFound) require.Nil(t, task) } + +func TestInitMeta(t *testing.T) { + store, sm, ctx := testutil.InitTableTest(t) + tk := testkit.NewTestKit(t, store) + require.NoError(t, sm.InitMeta(ctx, "tidb1", "")) + tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("")) + require.NoError(t, sm.InitMeta(ctx, "tidb1", "background")) + tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background")) + tk.MustExec(`set global tidb_service_scope=""`) + tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("")) + + // 1. delete then start. + require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"})) + require.NoError(t, sm.InitMeta(ctx, "tidb1", "")) + tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("")) + + require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"})) + require.NoError(t, sm.InitMeta(ctx, "tidb1", "background")) + tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background")) + + // 2. delete then set. + require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"})) + tk.MustExec(`set global tidb_service_scope=""`) + tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("")) + + require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"})) + tk.MustExec(`set global tidb_service_scope="background"`) + tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background")) +} diff --git a/pkg/disttask/framework/storage/task_state_test.go b/pkg/disttask/framework/storage/task_state_test.go index 1eca809e76195..777d5477d1abe 100644 --- a/pkg/disttask/framework/storage/task_state_test.go +++ b/pkg/disttask/framework/storage/task_state_test.go @@ -29,7 +29,7 @@ import ( func TestTaskState(t *testing.T) { _, gm, ctx := testutil.InitTableTest(t) - require.NoError(t, gm.StartManager(ctx, ":4000", "")) + require.NoError(t, gm.InitMeta(ctx, ":4000", "")) // 1. cancel task id, err := gm.CreateTask(ctx, "key1", "test", 4, []byte("test")) diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 31c64968a9595..f87b0ffda5eee 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -682,21 +682,37 @@ func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64, execI return err } -// StartManager insert the manager information into dist_framework_meta. -func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role string) error { +// InitMeta insert the manager information into dist_framework_meta. +func (stm *TaskManager) InitMeta(ctx context.Context, tidbID string, role string) error { return stm.WithNewSession(func(se sessionctx.Context) error { - return stm.StartManagerSession(ctx, se, tidbID, role) + return stm.InitMetaSession(ctx, se, tidbID, role) }) } -// StartManagerSession insert the manager information into dist_framework_meta. -// if the record exists, update the cpu_count. -func (*TaskManager) StartManagerSession(ctx context.Context, se sessionctx.Context, execID string, role string) error { +// InitMetaSession insert the manager information into dist_framework_meta. +// if the record exists, update the cpu_count and role. +func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error { cpuCount := cpu.GetCPUCount() _, err := sqlexec.ExecSQL(ctx, se, ` insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id) values (%?, %?, %?, -1) - on duplicate key update cpu_count = %?`, + on duplicate key + update cpu_count = %?, role = %?`, + execID, role, cpuCount, cpuCount, role) + return err +} + +// RecoverMeta insert the manager information into dist_framework_meta. +// if the record exists, update the cpu_count. +// Don't update role for we only update it in `set global tidb_service_scope`. +// if not there might has a data race. +func (stm *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error { + cpuCount := cpu.GetCPUCount() + _, err := stm.executeSQLWithNewSession(ctx, ` + insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id) + values (%?, %?, %?, -1) + on duplicate key + update cpu_count = %?`, execID, role, cpuCount, cpuCount) return err } diff --git a/pkg/disttask/framework/taskexecutor/interface.go b/pkg/disttask/framework/taskexecutor/interface.go index cb22d0e849b2d..cb77b63bc9387 100644 --- a/pkg/disttask/framework/taskexecutor/interface.go +++ b/pkg/disttask/framework/taskexecutor/interface.go @@ -21,13 +21,18 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" ) -// TaskTable defines the interface to access task table. +// TaskTable defines the interface to access the task table. type TaskTable interface { GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error) GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error) GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error) - StartManager(ctx context.Context, tidbID string, role string) error + // InitMeta insert the manager information into dist_framework_meta. + // Call it when starting task executor or in set variable operation. + InitMeta(ctx context.Context, tidbID string, role string) error + // RecoverMeta recover the manager information into dist_framework_meta. + // Call it periodically to recover deleted meta. + RecoverMeta(ctx context.Context, tidbID string, role string) error // StartSubtask try to update the subtask's state to running if the subtask is owned by execID. // If the update success, it means the execID's related task executor own the subtask. StartSubtask(ctx context.Context, subtaskID int64, execID string) error diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 0ba153b82b215..69d2b018917a4 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -112,9 +112,12 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable return m, nil } -func (m *Manager) initMeta() (err error) { +// InitMeta initializes the meta of the Manager. +// not a must-success step before start manager, +// manager will try to recover meta periodically. +func (m *Manager) InitMeta() (err error) { for i := 0; i < retrySQLTimes; i++ { - err = m.taskTable.StartManager(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope) + err = m.taskTable.InitMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope) if err == nil { break } @@ -132,10 +135,24 @@ func (m *Manager) initMeta() (err error) { return err } -// InitMeta initializes the meta of the Manager. -// not a must-success step before start manager, manager will try to init meta periodically. -func (m *Manager) InitMeta() error { - return m.initMeta() +func (m *Manager) recoverMeta() (err error) { + for i := 0; i < retrySQLTimes; i++ { + err = m.taskTable.RecoverMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope) + if err == nil { + break + } + if err1 := m.ctx.Err(); err1 != nil { + return err1 + } + if i%10 == 0 { + logutil.Logger(m.logCtx).Warn("recover meta failed", + zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope), + zap.Int("retry times", i), + zap.Error(err)) + } + time.Sleep(retrySQLInterval) + } + return err } // Start starts the Manager. @@ -292,7 +309,7 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error { return nil } -// recoverMetaLoop inits and recovers dist_framework_meta for the tidb node running the taskExecutor manager. +// recoverMetaLoop recovers dist_framework_meta for the tidb node running the taskExecutor manager. // This is necessary when the TiDB node experiences a prolonged network partition // and the scheduler deletes `dist_framework_meta`. // When the TiDB node recovers from the network partition, @@ -306,7 +323,7 @@ func (m *Manager) recoverMetaLoop() { logutil.Logger(m.logCtx).Info("recoverMetaLoop done") return case <-ticker.C: - if err := m.initMeta(); err != nil { + if err := m.recoverMeta(); err != nil { m.logErr(err) continue } diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go index f159c2c1df244..efafb5ced13bd 100644 --- a/pkg/disttask/framework/taskexecutor/manager_test.go +++ b/pkg/disttask/framework/taskexecutor/manager_test.go @@ -220,7 +220,7 @@ func TestManager(t *testing.T) { task2 := &proto.Task{ID: taskID2, State: proto.TaskStateReverting, Step: proto.StepOne, Type: "type"} task3 := &proto.Task{ID: taskID3, State: proto.TaskStatePausing, Step: proto.StepOne, Type: "type"} - mockTaskTable.EXPECT().StartManager(m.ctx, "test", "").Return(nil).Times(1) + mockTaskTable.EXPECT().InitMeta(m.ctx, "test", "").Return(nil).Times(1) mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting). Return([]*proto.Task{task1, task2}, nil).AnyTimes() mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateReverting). @@ -496,12 +496,12 @@ func TestManagerInitMeta(t *testing.T) { ctx: ctx, logCtx: ctx, } - mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) require.NoError(t, m.InitMeta()) require.True(t, ctrl.Satisfied()) gomock.InOrder( - mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")), - mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil), + mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")), + mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil), ) require.NoError(t, m.InitMeta()) require.True(t, ctrl.Satisfied()) @@ -511,12 +511,12 @@ func TestManagerInitMeta(t *testing.T) { retrySQLTimes = bak }) retrySQLTimes = 1 - mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")) + mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")) require.ErrorContains(t, m.InitMeta(), "mock err") require.True(t, ctrl.Satisfied()) cancel() - mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")) + mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")) require.ErrorIs(t, m.InitMeta(), context.Canceled) require.True(t, ctrl.Satisfied()) } diff --git a/pkg/disttask/framework/taskexecutor/task_executor_testkit_test.go b/pkg/disttask/framework/taskexecutor/task_executor_testkit_test.go index 2581f21e43b65..eda9012303f8c 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_testkit_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_testkit_test.go @@ -96,7 +96,7 @@ func TestTaskExecutorBasic(t *testing.T) { } return nil }) - require.NoError(t, mgr.StartManager(ctx, ":4000", "")) + require.NoError(t, mgr.InitMeta(ctx, ":4000", "")) for i := 0; i < 10; i++ { runOneTask(ctx, t, mgr, "key"+strconv.Itoa(i), i) } diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 9272b863b10f8..c20acce07dff6 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -170,13 +170,18 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", showValStr)) if name == variable.TiDBServiceScope { dom := domain.GetDomain(e.Ctx()) - config.GetGlobalConfig().Instance.TiDBServiceScope = valStr + oldConfig := config.GetGlobalConfig() + if oldConfig.Instance.TiDBServiceScope != valStr { + newConfig := *oldConfig + newConfig.Instance.TiDBServiceScope = valStr + config.StoreGlobalConfig(&newConfig) + } serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID()) taskMgr, err := storage.GetTaskManager() if err != nil { return err } - return taskMgr.StartManagerSession(ctx, e.Ctx(), serverID, valStr) + return taskMgr.InitMetaSession(ctx, e.Ctx(), serverID, valStr) } return err } diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 3cbaa178a7059..b72d96b7f6122 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -2965,7 +2965,14 @@ var defaultSysVars = []*SysVar{ return normalizedValue, nil }, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { - ServiceScope.Store(strings.ToLower(s)) + newValue := strings.ToLower(s) + ServiceScope.Store(newValue) + oldConfig := config.GetGlobalConfig() + if oldConfig.Instance.TiDBServiceScope != newValue { + newConfig := *oldConfig + newConfig.Instance.TiDBServiceScope = newValue + config.StoreGlobalConfig(&newConfig) + } return nil }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { return ServiceScope.Load(), nil