Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into merge-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Jan 9, 2024
2 parents ad89207 + d5a21e2 commit dd3c7ec
Show file tree
Hide file tree
Showing 25 changed files with 7,973 additions and 7,593 deletions.
3 changes: 2 additions & 1 deletion pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ go_test(
"framework_err_handling_test.go",
"framework_ha_test.go",
"framework_pause_and_resume_test.go",
"framework_role_test.go",
"framework_rollback_test.go",
"framework_test.go",
],
flaky = True,
race = "off",
shard_count = 25,
shard_count = 26,
deps = [
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
Expand Down
96 changes: 96 additions & 0 deletions pkg/disttask/framework/framework_role_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package framework_test

import (
"context"
"slices"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expectedNodes []string) {
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
nodes, err := storage.GetSubtaskNodesForTest(ctx, mgr, taskID)
require.NoError(t, err)
slices.Sort(nodes)
slices.Sort(expectedNodes)
require.EqualValues(t, expectedNodes, nodes)
}

func TestRoleBasic(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
tk := testkit.NewTestKit(t, distContext.Store)

// 1. all "" role.
submitTaskAndCheckSuccessForBasic(ctx, t, "😁", testContext)

checkSubtaskOnNodes(ctx, t, 1, []string{":4000", ":4001", ":4002"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))

// 2. one "background" role.
tk.MustExec("set global tidb_service_scope=background")
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))

tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))

checkSubtaskOnNodes(ctx, t, 2, []string{":4000"})

// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😆", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))

checkSubtaskOnNodes(ctx, t, 3, []string{":4000", ":4001"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))

distContext.Close()
}

func TestSetRole(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
// 1. set wrong sys var.
tk.MustMatchErrMsg("set global tidb_service_scope=wrong", `incorrect value: .*. tidb_service_scope options: "", background`)
// 2. set keyspace id.
tk.MustExec("update mysql.dist_framework_meta set keyspace_id = 16777216 where host = \":4000\"")
tk.MustQuery("select keyspace_id from mysql.dist_framework_meta where host = \":4000\"").Check(testkit.Rows("16777216"))
}
30 changes: 0 additions & 30 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,36 +231,6 @@ func TestTaskExecutorDownManyNodes(t *testing.T) {
distContext.Close()
}

func TestFrameworkSetLabel(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
tk := testkit.NewTestKit(t, distContext.Store)

// 1. all "" role.
submitTaskAndCheckSuccessForBasic(ctx, t, "😁", testContext)

// 2. one "background" role.
tk.MustExec("set global tidb_service_scope=background")
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)

// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
submitTaskAndCheckSuccessForBasic(ctx, t, "😆", testContext)

// 4. set wrong sys var.
tk.MustMatchErrMsg("set global tidb_service_scope=wrong", `incorrect value: .*. tidb_service_scope options: "", background`)

// 5. set keyspace id.
tk.MustExec("update mysql.dist_framework_meta set keyspace_id = 16777216 where host = \":4001\"")
tk.MustQuery("select keyspace_id from mysql.dist_framework_meta where host = \":4001\"").Check(testkit.Rows("16777216"))

distContext.Close()
}

func TestGC(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
Expand Down
8 changes: 8 additions & 0 deletions pkg/disttask/framework/scheduler/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -123,6 +124,9 @@ func (nm *NodeManager) refreshManagedNodesLoop(ctx context.Context, taskMgr Task
}
}

// TestRefreshedChan is used to sync the test.
var TestRefreshedChan = make(chan struct{})

// refreshManagedNodes maintains the nodes managed by the framework.
func (nm *NodeManager) refreshManagedNodes(ctx context.Context, taskMgr TaskManager, slotMgr *slotManager) {
newNodes, err := taskMgr.GetManagedNodes(ctx)
Expand All @@ -140,6 +144,10 @@ func (nm *NodeManager) refreshManagedNodes(ctx context.Context, taskMgr TaskMana
}
slotMgr.updateCapacity(cpuCount)
nm.managedNodes.Store(&nodeIDs)

failpoint.Inject("syncRefresh", func() {
TestRefreshedChan <- struct{}{}
})
}

// GetManagedNodes returns the nodes managed by the framework.
Expand Down
28 changes: 14 additions & 14 deletions pkg/disttask/framework/storage/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

// CancelTask cancels task.
func (stm *TaskManager) CancelTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
Expand All @@ -46,8 +46,8 @@ func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Co
}

// FailTask implements the scheduler.TaskManager interface.
func (stm *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error {
_, err := stm.executeSQLWithNewSession(ctx,
func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error {
_, err := mgr.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
error = %?,
Expand All @@ -60,8 +60,8 @@ func (stm *TaskManager) FailTask(ctx context.Context, taskID int64, currentState
}

// RevertedTask implements the scheduler.TaskManager interface.
func (stm *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
Expand All @@ -73,9 +73,9 @@ func (stm *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
}

// PauseTask pauses the task.
func (stm *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error) {
func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error) {
found := false
err := stm.WithNewSession(func(se sessionctx.Context) error {
err := mgr.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se,
`update mysql.tidb_global_task
set state = %?,
Expand All @@ -98,8 +98,8 @@ func (stm *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, er
}

// PausedTask update the task state from pausing to paused.
func (stm *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
Expand All @@ -111,9 +111,9 @@ func (stm *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
}

// ResumeTask resumes the task.
func (stm *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error) {
func (mgr *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error) {
found := false
err := stm.WithNewSession(func(se sessionctx.Context) error {
err := mgr.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se,
`update mysql.tidb_global_task
set state = %?,
Expand All @@ -136,8 +136,8 @@ func (stm *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, e
}

// SucceedTask update task state from running to succeed.
func (stm *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
return stm.WithNewSession(func(se sessionctx.Context) error {
func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
return mgr.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se, `
update mysql.tidb_global_task
set state = %?,
Expand Down
Loading

0 comments on commit dd3c7ec

Please sign in to comment.