Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix can't reset service_scope #50108

Merged
merged 11 commits into from
Jan 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -284,7 +284,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
}
}()

require.NoError(t, mgr.StartManager(ctx, ":4000", "background"))
require.NoError(t, mgr.InitMeta(ctx, ":4000", "background"))

// 3s
cnt := 60
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
74 changes: 59 additions & 15 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ func checkTaskStateStep(t *testing.T, task *proto.Task, state proto.TaskState, s
func TestTaskTable(t *testing.T) {
_, gm, ctx := testutil.InitTableTest(t)

require.NoError(t, gm.StartManager(ctx, ":4000", ""))
require.NoError(t, gm.InitMeta(ctx, ":4000", ""))

_, err := gm.CreateTask(ctx, "key1", "test", 999, []byte("test"))
require.ErrorContains(t, err, "task concurrency(999) larger than cpu count")
@@ -232,7 +232,7 @@ func TestSwitchTaskStep(t *testing.T) {
store, tm, ctx := testutil.InitTableTest(t)
tk := testkit.NewTestKit(t, store)

require.NoError(t, tm.StartManager(ctx, ":4000", ""))
require.NoError(t, tm.InitMeta(ctx, ":4000", ""))
taskID, err := tm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
require.NoError(t, err)
task, err := tm.GetTaskByID(ctx, taskID)
@@ -282,7 +282,7 @@ func TestSwitchTaskStepInBatch(t *testing.T) {
store, tm, ctx := testutil.InitTableTest(t)
tk := testkit.NewTestKit(t, store)

require.NoError(t, tm.StartManager(ctx, ":4000", ""))
require.NoError(t, tm.InitMeta(ctx, ":4000", ""))
// normal flow
prepare := func(taskKey string) (*proto.Task, []*proto.Subtask) {
taskID, err := tm.CreateTask(ctx, taskKey, "test", 4, []byte("test"))
@@ -357,7 +357,7 @@ func TestSwitchTaskStepInBatch(t *testing.T) {
func TestGetTopUnfinishedTasks(t *testing.T) {
_, gm, ctx := testutil.InitTableTest(t)

require.NoError(t, gm.StartManager(ctx, ":4000", ""))
require.NoError(t, gm.InitMeta(ctx, ":4000", ""))
taskStates := []proto.TaskState{
proto.TaskStateSucceed,
proto.TaskStatePending,
@@ -440,7 +440,7 @@ func TestGetUsedSlotsOnNodes(t *testing.T) {
func TestSubTaskTable(t *testing.T) {
_, sm, ctx := testutil.InitTableTest(t)
timeBeforeCreate := time.Unix(time.Now().Unix(), 0)
require.NoError(t, sm.StartManager(ctx, ":4000", ""))
require.NoError(t, sm.InitMeta(ctx, ":4000", ""))
id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
require.NoError(t, err)
require.Equal(t, int64(1), id)
@@ -689,7 +689,7 @@ func TestSubTaskTable(t *testing.T) {

func TestBothTaskAndSubTaskTable(t *testing.T) {
_, sm, ctx := testutil.InitTableTest(t)
require.NoError(t, sm.StartManager(ctx, ":4000", ""))
require.NoError(t, sm.InitMeta(ctx, ":4000", ""))
id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
require.NoError(t, err)
require.Equal(t, int64(1), id)
@@ -830,16 +830,16 @@ func TestDistFrameworkMeta(t *testing.T) {
_, err := storage.GetCPUCountOfManagedNodes(ctx, sm)
require.ErrorContains(t, err, "no managed nodes")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)"))
require.NoError(t, sm.StartManager(ctx, ":4000", "background"))
require.NoError(t, sm.InitMeta(ctx, ":4000", "background"))
cpuCount, err := storage.GetCPUCountOfManagedNodes(ctx, sm)
require.NoError(t, err)
require.Equal(t, 0, cpuCount)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)"))
require.NoError(t, sm.StartManager(ctx, ":4000", "background"))
require.NoError(t, sm.InitMeta(ctx, ":4000", "background"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)"))
require.NoError(t, sm.StartManager(ctx, ":4001", ""))
require.NoError(t, sm.StartManager(ctx, ":4002", "background"))
require.NoError(t, sm.InitMeta(ctx, ":4001", ""))
require.NoError(t, sm.InitMeta(ctx, ":4002", "background"))
nodes, err := sm.GetAllNodes(ctx)
require.NoError(t, err)
require.Equal(t, []proto.ManagedNode{
@@ -848,23 +848,24 @@ func TestDistFrameworkMeta(t *testing.T) {
{ID: ":4002", Role: "background", CPUCount: 8},
}, nodes)

// won't be replaced by below one, but cpu count will be updated
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)"))
require.NoError(t, sm.StartManager(ctx, ":4002", ""))
require.NoError(t, sm.StartManager(ctx, ":4003", "background"))
require.NoError(t, sm.InitMeta(ctx, ":4002", ""))
require.NoError(t, sm.InitMeta(ctx, ":4003", "background"))

nodes, err = sm.GetAllNodes(ctx)
require.NoError(t, err)
require.Equal(t, []proto.ManagedNode{
{ID: ":4000", Role: "background", CPUCount: 100},
{ID: ":4001", Role: "", CPUCount: 8},
{ID: ":4002", Role: "background", CPUCount: 100},
{ID: ":4002", Role: "", CPUCount: 100},
{ID: ":4003", Role: "background", CPUCount: 100},
}, nodes)
cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm)
require.NoError(t, err)
require.Equal(t, 100, cpuCount)

require.NoError(t, sm.InitMeta(ctx, ":4002", "background"))

require.NoError(t, sm.DeleteDeadNodes(ctx, []string{":4000"}))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
@@ -889,6 +890,20 @@ func TestDistFrameworkMeta(t *testing.T) {
cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm)
require.NoError(t, err)
require.Equal(t, 8, cpuCount)

require.NoError(t, sm.RecoverMeta(ctx, ":4002", "background"))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
require.Equal(t, []proto.ManagedNode{
{ID: ":4002", Role: "background", CPUCount: 100},
}, nodes)
// should not reset role
require.NoError(t, sm.RecoverMeta(ctx, ":4002", ""))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
require.Equal(t, []proto.ManagedNode{
{ID: ":4002", Role: "background", CPUCount: 100},
}, nodes)
}

func TestSubtaskHistoryTable(t *testing.T) {
@@ -959,7 +974,7 @@ func TestSubtaskHistoryTable(t *testing.T) {
func TestTaskHistoryTable(t *testing.T) {
_, gm, ctx := testutil.InitTableTest(t)

require.NoError(t, gm.StartManager(ctx, ":4000", ""))
require.NoError(t, gm.InitMeta(ctx, ":4000", ""))
_, err := gm.CreateTask(ctx, "1", proto.TaskTypeExample, 1, nil)
require.NoError(t, err)
taskID, err := gm.CreateTask(ctx, "2", proto.TaskTypeExample, 1, nil)
@@ -1071,3 +1086,32 @@ func TestTaskNotFound(t *testing.T) {
require.Error(t, err, storage.ErrTaskNotFound)
require.Nil(t, task)
}

func TestInitMeta(t *testing.T) {
store, sm, ctx := testutil.InitTableTest(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, sm.InitMeta(ctx, "tidb1", ""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows(""))
require.NoError(t, sm.InitMeta(ctx, "tidb1", "background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background"))
tk.MustExec(`set global tidb_service_scope=""`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows(""))

// 1. delete then start.
require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
require.NoError(t, sm.InitMeta(ctx, "tidb1", ""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows(""))

require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
require.NoError(t, sm.InitMeta(ctx, "tidb1", "background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background"))

// 2. delete then set.
require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
tk.MustExec(`set global tidb_service_scope=""`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows(""))

require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
tk.MustExec(`set global tidb_service_scope="background"`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
}
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/task_state_test.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ import (
func TestTaskState(t *testing.T) {
_, gm, ctx := testutil.InitTableTest(t)

require.NoError(t, gm.StartManager(ctx, ":4000", ""))
require.NoError(t, gm.InitMeta(ctx, ":4000", ""))

// 1. cancel task
id, err := gm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
30 changes: 23 additions & 7 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
@@ -682,21 +682,37 @@ func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64, execI
return err
}

// StartManager insert the manager information into dist_framework_meta.
func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role string) error {
// InitMeta insert the manager information into dist_framework_meta.
func (stm *TaskManager) InitMeta(ctx context.Context, tidbID string, role string) error {
return stm.WithNewSession(func(se sessionctx.Context) error {
return stm.StartManagerSession(ctx, se, tidbID, role)
return stm.InitMetaSession(ctx, se, tidbID, role)
})
}

// StartManagerSession insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count.
func (*TaskManager) StartManagerSession(ctx context.Context, se sessionctx.Context, execID string, role string) error {
// InitMetaSession insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count and role.
func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error {
cpuCount := cpu.GetCPUCount()
_, err := sqlexec.ExecSQL(ctx, se, `
insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id)
values (%?, %?, %?, -1)
on duplicate key update cpu_count = %?`,
on duplicate key
update cpu_count = %?, role = %?`,
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
execID, role, cpuCount, cpuCount, role)
return err
}

// RecoverMeta insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count.
// Don't update role for we only update it in `set global tidb_service_scope`.
// if not there might has a data race.
func (stm *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error {
cpuCount := cpu.GetCPUCount()
_, err := stm.executeSQLWithNewSession(ctx, `
insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id)
values (%?, %?, %?, -1)
on duplicate key
update cpu_count = %?`,
execID, role, cpuCount, cpuCount)
return err
}
9 changes: 7 additions & 2 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
@@ -21,13 +21,18 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
)

// TaskTable defines the interface to access task table.
// TaskTable defines the interface to access the task table.
type TaskTable interface {
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)
StartManager(ctx context.Context, tidbID string, role string) error
// InitMeta insert the manager information into dist_framework_meta.
// Call it when starting task executor or in set variable operation.
InitMeta(ctx context.Context, tidbID string, role string) error
// RecoverMeta recover the manager information into dist_framework_meta.
// Call it periodically to recover deleted meta.
RecoverMeta(ctx context.Context, tidbID string, role string) error
// StartSubtask try to update the subtask's state to running if the subtask is owned by execID.
// If the update success, it means the execID's related task executor own the subtask.
StartSubtask(ctx context.Context, subtaskID int64, execID string) error
33 changes: 25 additions & 8 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
@@ -112,9 +112,12 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
return m, nil
}

func (m *Manager) initMeta() (err error) {
// InitMeta initializes the meta of the Manager.
// not a must-success step before start manager,
// manager will try to recover meta periodically.
func (m *Manager) InitMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.StartManager(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
err = m.taskTable.InitMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
@@ -132,10 +135,24 @@ func (m *Manager) initMeta() (err error) {
return err
}

// InitMeta initializes the meta of the Manager.
// not a must-success step before start manager, manager will try to init meta periodically.
func (m *Manager) InitMeta() error {
return m.initMeta()
func (m *Manager) recoverMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.RecoverMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
if err1 := m.ctx.Err(); err1 != nil {
return err1
}
if i%10 == 0 {
logutil.Logger(m.logCtx).Warn("recover meta failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", i),
zap.Error(err))
}
time.Sleep(retrySQLInterval)
}
return err
}

// Start starts the Manager.
@@ -292,7 +309,7 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
return nil
}

// recoverMetaLoop inits and recovers dist_framework_meta for the tidb node running the taskExecutor manager.
// recoverMetaLoop recovers dist_framework_meta for the tidb node running the taskExecutor manager.
// This is necessary when the TiDB node experiences a prolonged network partition
// and the scheduler deletes `dist_framework_meta`.
// When the TiDB node recovers from the network partition,
@@ -306,7 +323,7 @@ func (m *Manager) recoverMetaLoop() {
logutil.Logger(m.logCtx).Info("recoverMetaLoop done")
return
case <-ticker.C:
if err := m.initMeta(); err != nil {
if err := m.recoverMeta(); err != nil {
m.logErr(err)
continue
}
Loading