Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix can't reset service_scope #50108

Merged
merged 11 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 15,
shard_count = 16,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
Expand Down
29 changes: 29 additions & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,3 +1060,32 @@ func TestTaskNotFound(t *testing.T) {
require.Error(t, err, storage.ErrTaskNotFound)
require.Nil(t, task)
}

func TestStartManager(t *testing.T) {
store, sm, ctx := testutil.InitTableTest(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, sm.StartManager(ctx, "tidb1", ""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows(""))
require.NoError(t, sm.StartManager(ctx, "tidb1", "background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background"))
tk.MustExec(`set global tidb_service_scope=""`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows(""))

// 1. delete then recover.
require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
require.NoError(t, sm.StartManager(ctx, "tidb1", ""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows(""))

require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
require.NoError(t, sm.StartManager(ctx, "tidb1", "background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host="tidb1"`).Check(testkit.Rows("background"))

// 2. delete then set.
require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
tk.MustExec(`set global tidb_service_scope=""`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows(""))

require.NoError(t, sm.DeleteDeadNodes(ctx, []string{"tidb1"}))
tk.MustExec(`set global tidb_service_scope="background"`)
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
}
7 changes: 4 additions & 3 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,14 +690,15 @@ func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role st
}

// StartManagerSession insert the manager information into dist_framework_meta.
// if the record exists, update the cpu_count.
// if the record exists, update the cpu_count and role.
func (*TaskManager) StartManagerSession(ctx context.Context, se sessionctx.Context, execID string, role string) error {
cpuCount := cpu.GetCPUCount()
_, err := sqlexec.ExecSQL(ctx, se, `
insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id)
values (%?, %?, %?, -1)
on duplicate key update cpu_count = %?`,
execID, role, cpuCount, cpuCount)
on duplicate key
update cpu_count = %?, role = %?`,
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
execID, role, cpuCount, cpuCount, role)
return err
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
)

// TaskTable defines the interface to access task table.
// TaskTable defines the interface to access the task table.
type TaskTable interface {
GetTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetSubtasksByStepAndStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)
GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)
// StartManagerSession insert the manager information into dist_framework_meta.
// Call it when starting task executor or in recover meta loop.
StartManager(ctx context.Context, tidbID string, role string) error
// StartSubtask try to update the subtask's state to running if the subtask is owned by execID.
// If the update success, it means the execID's related task executor own the subtask.
Expand Down
7 changes: 6 additions & 1 deletion pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,12 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", showValStr))
if name == variable.TiDBServiceScope {
dom := domain.GetDomain(e.Ctx())
config.GetGlobalConfig().Instance.TiDBServiceScope = valStr
oldConfig := config.GetGlobalConfig()
if oldConfig.Instance.TiDBServiceScope != valStr {
newConfig := *oldConfig
newConfig.Instance.TiDBServiceScope = valStr
config.StoreGlobalConfig(&newConfig)
}
serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID())
taskMgr, err := storage.GetTaskManager()
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2959,7 +2959,14 @@ var defaultSysVars = []*SysVar{
return normalizedValue, nil
},
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
ServiceScope.Store(strings.ToLower(s))
newValue := strings.ToLower(s)
ServiceScope.Store(newValue)
oldConfig := config.GetGlobalConfig()
if oldConfig.Instance.TiDBServiceScope != newValue {
newConfig := *oldConfig
newConfig.Instance.TiDBServiceScope = newValue
config.StoreGlobalConfig(&newConfig)
}
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return ServiceScope.Load(), nil
Expand Down