Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dxf: systable and scheduler change for modify task at runtime #57557

Merged
merged 10 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(218), session.CurrentBootstrapVersion)
require.Equal(t, int64(239), session.CurrentBootstrapVersion)
}
1 change: 1 addition & 0 deletions pkg/disttask/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "proto",
srcs = [
"modify.go",
"node.go",
"step.go",
"subtask.go",
Expand Down
40 changes: 40 additions & 0 deletions pkg/disttask/framework/proto/modify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

// ModificationType is the type of task modification.
type ModificationType string

// String implements fmt.Stringer interface.
func (t ModificationType) String() string {
return string(t)
}

const (
// ModifyConcurrency is the type for modifying task concurrency.
ModifyConcurrency ModificationType = "modify_concurrency"
)

// ModifyParam is the parameter for task modification.
type ModifyParam struct {
PrevState TaskState `json:"prev_state"`
Modifications []Modification `json:"modifications"`
}

// Modification is one modification for task.
type Modification struct {
Type ModificationType `json:"type"`
To int64 `json:"to"`
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 42 additions & 10 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
// The `failed` state is used to mean the framework cannot run the task, such as
// invalid task type, scheduler init error(fatal), etc.
//
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▲ ▼ │
// ┌──┴────┐ ┌───┴───┐ ┌────────┐
// normal execution state transition:
//
// ┌──────┐
// │failed│
// └──────┘
// ▲
// ┌──┴────┐ ┌───────┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └──┬┬───┘ └────────┘
// │ ││ ┌─────────┐ ┌────────┐
Expand All @@ -40,6 +39,32 @@ import (
// │ ┌──────────┐ ▲
// └─────────►│cancelling├────┘
// └──────────┘
//
// pause/resume state transition:
// as we don't know the state of the task before `paused`, so the state after
// `resuming` is always `running`.
//
// ┌───────┐
// │pending├──┐
// └───────┘ │ ┌───────┐ ┌──────┐
// ├────►│pausing├──────►│paused│
// ┌───────┐ │ └───────┘ └───┬──┘
// │running├──┘ │
// └───▲───┘ ┌────────┐ │
// └────────────┤resuming│◄─────────┘
// └────────┘
//
// modifying state transition:
//
// ┌───────┐
// │pending├──┐
// └───────┘ │
// ┌───────┐ │ ┌─────────┐
// │running├──┼────►│modifying├────► original state
// └───────┘ │ └─────────┘
// ┌───────┐ │
// │paused ├──┘
// └───────┘
const (
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
Expand All @@ -51,6 +76,7 @@ const (
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
TaskStateModifying TaskState = "modifying"
)

type (
Expand All @@ -68,6 +94,11 @@ func (s TaskState) String() string {
return string(s)
}

// CanMoveToModifying checks if current state can move to 'modifying' state.
func (s TaskState) CanMoveToModifying() bool {
return s == TaskStatePending || s == TaskStateRunning || s == TaskStatePaused
}

const (
// TaskIDLabelName is the label name of task id.
TaskIDLabelName = "task_id"
Expand Down Expand Up @@ -154,8 +185,9 @@ type Task struct {
// changed in below case, and framework will update the task meta in the storage.
// - task switches to next step in Scheduler.OnNextSubtasksBatch
// - on task cleanup, we might do some redaction on the meta.
Meta []byte
Error error
Meta []byte
Error error
ModifyParam ModifyParam
}

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/scheduler/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, sl
for _, node := range newNodes {
if node.CPUCount > 0 {
cpuCount = node.CPUCount
break
}
}
slotMgr.updateCapacity(cpuCount)
Expand Down
32 changes: 27 additions & 5 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *BaseScheduler) refreshTaskIfNeeded() error {
if err != nil {
return err
}
// state might be changed by user to pausing/resuming/cancelling, or
// state might be changed by user to pausing/resuming/cancelling/modifying, or
// in case of network partition, state/step/meta might be changed by other scheduler,
// in both cases we refresh the whole task object.
if newTaskBase.State != task.State || newTaskBase.Step != task.Step {
Expand Down Expand Up @@ -206,8 +206,7 @@ func (s *BaseScheduler) scheduleTask() {
return
}
case proto.TaskStateResuming:
// Case with 2 nodes.
// Here is the timeline
// need to check allocatedSlots for the following case:
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
Expand All @@ -221,10 +220,20 @@ func (s *BaseScheduler) scheduleTask() {
case proto.TaskStateReverting:
err = s.onReverting()
case proto.TaskStatePending:
// need to check allocatedSlots for the following case:
// 1. task in modifying state, node A and B start schedulers with
// task in modifying state without allocatedSlots.
// 2. node A's scheduler finished modifying, and transfer the node
// from modifying to pending state.
// 3. node B's scheduler call refreshTask and get task with pending
// state, but this scheduler has not allocated slots.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
}
err = s.onPending()
case proto.TaskStateRunning:
// Case with 2 nodes.
// Here is the timeline
// need to check allocatedSlots for the following case:
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
Expand All @@ -236,6 +245,12 @@ func (s *BaseScheduler) scheduleTask() {
return
}
err = s.onRunning()
case proto.TaskStateModifying:
var recreateScheduler bool
recreateScheduler, err = s.onModifying()
if err == nil && recreateScheduler {
return
}
case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed:
s.onFinished()
return
Expand Down Expand Up @@ -385,6 +400,13 @@ func (s *BaseScheduler) onRunning() error {
return nil
}

// onModifying is called when task is in modifying state.
// the first return value indicates whether the scheduler should be recreated.
func (*BaseScheduler) onModifying() (bool, error) {
// TODO: implement me
panic("implement me")
}

func (s *BaseScheduler) onFinished() {
task := s.GetTask()
s.logger.Debug("schedule task, task is finished", zap.Stringer("state", task.State))
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
// task of lower rank might be able to be scheduled.
continue
}
// reverting/cancelling/pausing
// reverting/cancelling/pausing/modifying, we don't allocate slots for them.
default:
allocateSlots = false
sm.logger.Info("start scheduler without allocating slots",
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 22,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
Expand All @@ -53,6 +53,7 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
7 changes: 7 additions & 0 deletions pkg/disttask/framework/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"encoding/json"
"strconv"
"time"

Expand Down Expand Up @@ -66,6 +67,12 @@ func Row2Task(r chunk.Row) *proto.Task {
task.Error = stdErr
}
}
if !r.IsNull(14) {
str := r.GetJSON(14).String()
if err := json.Unmarshal([]byte(str), &task.ModifyParam); err != nil {
logutil.BgLogger().Error("unmarshal task modify param", zap.Error(err))
}
}
return task
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestGetTopUnfinishedTasks(t *testing.T) {
proto.TaskStatePending,
proto.TaskStatePending,
proto.TaskStatePending,
proto.TaskStateModifying,
}
for i, state := range taskStates {
taskKey := fmt.Sprintf("key/%d", i)
Expand Down Expand Up @@ -403,17 +404,26 @@ func TestGetTopUnfinishedTasks(t *testing.T) {
rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
select count(1) from mysql.tidb_global_task`)
require.Len(t, rs, 1)
require.Equal(t, int64(12), rs[0].GetInt64(0))
require.Equal(t, int64(13), rs[0].GetInt64(0))
return err
}))
getTaskKeys := func(tasks []*proto.TaskBase) []string {
taskKeys := make([]string, 0, len(tasks))
for _, task := range tasks {
taskKeys = append(taskKeys, task.Key)
}
return taskKeys
}
tasks, err := gm.GetTopUnfinishedTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 8)
taskKeys := make([]string, 0, len(tasks))
for _, task := range tasks {
taskKeys = append(taskKeys, task.Key)
}
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9"}, taskKeys)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9"}, getTaskKeys(tasks))

proto.MaxConcurrentTask = 6
tasks, err = gm.GetTopUnfinishedTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 11)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9", "key/10", "key/11", "key/12"}, getTaskKeys(tasks))
}

func TestGetUsedSlotsOnNodes(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/disttask/framework/storage/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package storage

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/sqlexec"
Expand Down Expand Up @@ -159,6 +162,43 @@ func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error {
return err
}

// ModifyTaskByID modifies the task by the task ID.
func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param *proto.ModifyParam) error {
if !param.PrevState.CanMoveToModifying() {
return ErrTaskStateNotAllow
}
bytes, err := json.Marshal(param)
if err != nil {
return errors.Trace(err)
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
task, err2 := mgr.getTaskBaseByID(ctx, se.GetSQLExecutor(), taskID)
if err2 != nil {
return err2
}
if task.State != param.PrevState {
return ErrTaskChanged
}
failpoint.InjectCall("beforeMoveToModifying")
_, err = sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
update mysql.tidb_global_task
set state = %?, modify_params = %?, state_update_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStateModifying, json.RawMessage(bytes), taskID, param.PrevState,
)
if err != nil {
return err
}
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
// the txn is pessimistic, it's possible that another txn has
// changed the task state before this txn commits and there is no
// write-conflict.
return ErrTaskChanged
}
return nil
})
}

// SucceedTask update task state from running to succeed.
func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
return mgr.WithNewSession(func(se sessionctx.Context) error {
Expand Down
Loading