Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addindex/disttask: adjust add index task concurrency & add check when submit task #49403

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/cpu",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/domainutil",
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -2115,6 +2116,11 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
}

job := reorgInfo.Job
workerCntLimit := int(variable.GetDDLReorgWorkerCounter())
concurrency := min(workerCntLimit, cpu.GetCPUCount())
logutil.BgLogger().Info("adjusted add-index task concurrency",
zap.Int("worker-cnt", workerCntLimit), zap.Int("task-concurrency", concurrency),
zap.String("task-key", taskKey))
taskMeta := &BackfillTaskMeta{
Job: *reorgInfo.Job.Clone(),
EleIDs: elemIDs,
Expand All @@ -2129,7 +2135,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {

g.Go(func() error {
defer close(done)
err := handle.SubmitAndWaitTask(ctx, taskKey, taskType, distPhysicalTableConcurrency, metaData)
err := handle.SubmitAndWaitTask(ctx, taskKey, taskType, concurrency, metaData)
failpoint.Inject("pauseAfterDistTaskFinished", func() {
MockDMLExecutionOnTaskFinished()
})
Expand Down
32 changes: 15 additions & 17 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@ import (
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// │ ┌───────┐ ┌──┴───┐
// │ ┌────────►│pausing├──────►│paused│
// │ │ └───────┘ └──────┘
// ▼ │
// ┌──────┐ ┌───┴───┐ ┌────────┐
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▼ │
// ┌──────┐ ┌───┴───┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └───┬───┘ └────────┘
// ▼ │ ┌──────────┐
// ┌──────┐ ├────────►│cancelling│
// │failed│ │ └────┬─────┘
// └──────┘ │ ▼
// │ ┌─────────┐ ┌────────┐
// └────────►│reverting├────►│reverted│
// └────┬────┘ └────────┘
// │ ┌─────────────┐
// └─────────►│revert_failed│
// └─────────────┘
// └──┬────┘ └──┬┬───┘ └────────┘
Comment on lines +27 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will the task state transform from pending to failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid task type or dispatcher init failed

// │ ││ ┌─────────┐ ┌────────┐
// │ │└────────►│reverting├────►│reverted│
// │ ▼ └────┬────┘ └────────┘
// │ ┌──────────┐ ▲ │ ┌─────────────┐
// └─────────►│cancelling├────┘ └─────────►│revert_failed│
// └──────────┘ └─────────────┘
// 1. succeed: pending -> running -> succeed
// 2. failed: pending -> running -> reverting -> reverted/revert_failed, pending -> failed
// 3. canceled: pending -> running -> cancelling -> reverting -> reverted/revert_failed
Expand Down Expand Up @@ -141,7 +137,9 @@ type Task struct {
Step Step
// Priority is the priority of task, the smaller value means the higher priority.
// valid range is [1, 1024], default is NormalPriority.
Priority int
Priority int
// Concurrency controls the max resource usage of the task, i.e. the max number
// of slots the task can use on each node.
Concurrency int
CreateTime time.Time

Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/util/chunk",
"//pkg/util/cpu",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func TestMain(m *testing.M) {
func TestTaskTable(t *testing.T) {
gm, ctx := testutil.InitTableTest(t)

_, err := gm.CreateTask(ctx, "key1", "test", 999, []byte("test"))
require.ErrorContains(t, err, "task concurrency(999) larger than cpu count")

timeBeforeCreate := time.Unix(time.Now().Unix(), 0)
id, err := gm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
Expand Down Expand Up @@ -215,6 +216,14 @@ func (stm *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas

// CreateTaskWithSession adds a new task to task table with session.
func (*TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) {
cpuCount := cpu.GetCPUCount()
if concurrency > cpuCount {
// current resource control cannot schedule tasks with concurrency larger
// than cpu count
// TODO: if we are submitting a task on a node that is not managed by
// disttask framework, the checked cpu-count might not right.
return 0, errors.Errorf("task concurrency(%d) larger than cpu count(%d)", concurrency, cpuCount)
}
_, err = ExecSQL(ctx, se, `
insert into mysql.tidb_global_task(`+InsertTaskColumns+`)
values (%?, %?, %?, %?, %?, %?, %?, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP())`,
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/addindextest1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ go_test(
"//pkg/ddl/util/callback",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/sessionctx/variable",
"//pkg/store/helper",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/types",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
],
)
24 changes: 24 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package addindextest

import (
"context"
"fmt"
"testing"

"github.com/pingcap/failpoint"
Expand All @@ -23,15 +25,18 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)

func init() {
Expand All @@ -41,6 +46,13 @@ func init() {
}

func TestAddIndexDistBasic(t *testing.T) {
// mock that we only have 1 cpu, add-index task can be scheduled as usual
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", `return(1)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/testSetLastTaskID", `return(true)`))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/testSetLastTaskID"))
})
store := realtikvtest.CreateMockStoreAndSetup(t)
if store.Name() != "TiKV" {
t.Skip("TiKV store only")
Expand All @@ -52,6 +64,9 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec("use test;")
tk.MustExec(`set global tidb_enable_dist_task=1;`)

bak := variable.GetDDLReorgWorkerCounter()
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 111")
require.Equal(t, int32(111), variable.GetDDLReorgWorkerCounter())
tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 20;")
tk.MustExec("insert into t values (), (), (), (), (), ()")
tk.MustExec("insert into t values (), (), (), (), (), ()")
Expand All @@ -61,6 +76,15 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;")
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check index t idx;")
taskMgr, err := storage.GetTaskManager()
require.NoError(t, err)
ctx := util.WithInternalSourceType(context.Background(), "dispatcher")
task, err := taskMgr.GetTaskByIDWithHistory(ctx, storage.TestLastTaskID.Load())
require.NoError(t, err)
require.Equal(t, 1, task.Concurrency)

tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_worker_cnt = %d", bak))
require.Equal(t, bak, variable.GetDDLReorgWorkerCounter())

tk.MustExec("create table t1(a bigint auto_random primary key);")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
Expand Down