Skip to content

Commit

Permalink
dxf: systable and scheduler change for modify task at runtime (#57557)
Browse files Browse the repository at this point in the history
ref #57497
  • Loading branch information
D3Hunter authored Nov 25, 2024
1 parent cc22c0b commit 9ff83eb
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 32 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(218), session.CurrentBootstrapVersion)
require.Equal(t, int64(239), session.CurrentBootstrapVersion)
}
1 change: 1 addition & 0 deletions pkg/disttask/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "proto",
srcs = [
"modify.go",
"node.go",
"step.go",
"subtask.go",
Expand Down
40 changes: 40 additions & 0 deletions pkg/disttask/framework/proto/modify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proto

// ModificationType is the type of task modification.
type ModificationType string

// String implements fmt.Stringer interface.
func (t ModificationType) String() string {
return string(t)
}

const (
// ModifyConcurrency is the type for modifying task concurrency.
ModifyConcurrency ModificationType = "modify_concurrency"
)

// ModifyParam is the parameter for task modification.
type ModifyParam struct {
PrevState TaskState `json:"prev_state"`
Modifications []Modification `json:"modifications"`
}

// Modification is one modification for task.
type Modification struct {
Type ModificationType `json:"type"`
To int64 `json:"to"`
}
52 changes: 42 additions & 10 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
// The `failed` state is used to mean the framework cannot run the task, such as
// invalid task type, scheduler init error(fatal), etc.
//
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▲ ▼ │
// ┌──┴────┐ ┌───┴───┐ ┌────────┐
// normal execution state transition:
//
// ┌──────┐
// │failed│
// └──────┘
// ▲
// ┌──┴────┐ ┌───────┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └──┬┬───┘ └────────┘
// │ ││ ┌─────────┐ ┌────────┐
Expand All @@ -40,6 +39,32 @@ import (
// │ ┌──────────┐ ▲
// └─────────►│cancelling├────┘
// └──────────┘
//
// pause/resume state transition:
// as we don't know the state of the task before `paused`, so the state after
// `resuming` is always `running`.
//
// ┌───────┐
// │pending├──┐
// └───────┘ │ ┌───────┐ ┌──────┐
// ├────►│pausing├──────►│paused│
// ┌───────┐ │ └───────┘ └───┬──┘
// │running├──┘ │
// └───▲───┘ ┌────────┐ │
// └────────────┤resuming│◄─────────┘
// └────────┘
//
// modifying state transition:
//
// ┌───────┐
// │pending├──┐
// └───────┘ │
// ┌───────┐ │ ┌─────────┐
// │running├──┼────►│modifying├────► original state
// └───────┘ │ └─────────┘
// ┌───────┐ │
// │paused ├──┘
// └───────┘
const (
TaskStatePending TaskState = "pending"
TaskStateRunning TaskState = "running"
Expand All @@ -51,6 +76,7 @@ const (
TaskStatePausing TaskState = "pausing"
TaskStatePaused TaskState = "paused"
TaskStateResuming TaskState = "resuming"
TaskStateModifying TaskState = "modifying"
)

type (
Expand All @@ -68,6 +94,11 @@ func (s TaskState) String() string {
return string(s)
}

// CanMoveToModifying checks if current state can move to 'modifying' state.
func (s TaskState) CanMoveToModifying() bool {
return s == TaskStatePending || s == TaskStateRunning || s == TaskStatePaused
}

const (
// TaskIDLabelName is the label name of task id.
TaskIDLabelName = "task_id"
Expand Down Expand Up @@ -154,8 +185,9 @@ type Task struct {
// changed in below case, and framework will update the task meta in the storage.
// - task switches to next step in Scheduler.OnNextSubtasksBatch
// - on task cleanup, we might do some redaction on the meta.
Meta []byte
Error error
Meta []byte
Error error
ModifyParam ModifyParam
}

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/scheduler/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, sl
for _, node := range newNodes {
if node.CPUCount > 0 {
cpuCount = node.CPUCount
break
}
}
slotMgr.updateCapacity(cpuCount)
Expand Down
32 changes: 27 additions & 5 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *BaseScheduler) refreshTaskIfNeeded() error {
if err != nil {
return err
}
// state might be changed by user to pausing/resuming/cancelling, or
// state might be changed by user to pausing/resuming/cancelling/modifying, or
// in case of network partition, state/step/meta might be changed by other scheduler,
// in both cases we refresh the whole task object.
if newTaskBase.State != task.State || newTaskBase.Step != task.Step {
Expand Down Expand Up @@ -206,8 +206,7 @@ func (s *BaseScheduler) scheduleTask() {
return
}
case proto.TaskStateResuming:
// Case with 2 nodes.
// Here is the timeline
// need to check allocatedSlots for the following case:
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
Expand All @@ -221,10 +220,20 @@ func (s *BaseScheduler) scheduleTask() {
case proto.TaskStateReverting:
err = s.onReverting()
case proto.TaskStatePending:
// need to check allocatedSlots for the following case:
// 1. task in modifying state, node A and B start schedulers with
// task in modifying state without allocatedSlots.
// 2. node A's scheduler finished modifying, and transfer the node
// from modifying to pending state.
// 3. node B's scheduler call refreshTask and get task with pending
// state, but this scheduler has not allocated slots.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
}
err = s.onPending()
case proto.TaskStateRunning:
// Case with 2 nodes.
// Here is the timeline
// need to check allocatedSlots for the following case:
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
Expand All @@ -236,6 +245,12 @@ func (s *BaseScheduler) scheduleTask() {
return
}
err = s.onRunning()
case proto.TaskStateModifying:
var recreateScheduler bool
recreateScheduler, err = s.onModifying()
if err == nil && recreateScheduler {
return
}
case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed:
s.onFinished()
return
Expand Down Expand Up @@ -385,6 +400,13 @@ func (s *BaseScheduler) onRunning() error {
return nil
}

// onModifying is called when task is in modifying state.
// the first return value indicates whether the scheduler should be recreated.
func (*BaseScheduler) onModifying() (bool, error) {
// TODO: implement me
panic("implement me")
}

func (s *BaseScheduler) onFinished() {
task := s.GetTask()
s.logger.Debug("schedule task, task is finished", zap.Stringer("state", task.State))
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
// task of lower rank might be able to be scheduled.
continue
}
// reverting/cancelling/pausing
// reverting/cancelling/pausing/modifying, we don't allocate slots for them.
default:
allocateSlots = false
sm.logger.Info("start scheduler without allocating slots",
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 22,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
Expand All @@ -53,6 +53,7 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
7 changes: 7 additions & 0 deletions pkg/disttask/framework/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"encoding/json"
"strconv"
"time"

Expand Down Expand Up @@ -66,6 +67,12 @@ func Row2Task(r chunk.Row) *proto.Task {
task.Error = stdErr
}
}
if !r.IsNull(14) {
str := r.GetJSON(14).String()
if err := json.Unmarshal([]byte(str), &task.ModifyParam); err != nil {
logutil.BgLogger().Error("unmarshal task modify param", zap.Error(err))
}
}
return task
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestGetTopUnfinishedTasks(t *testing.T) {
proto.TaskStatePending,
proto.TaskStatePending,
proto.TaskStatePending,
proto.TaskStateModifying,
}
for i, state := range taskStates {
taskKey := fmt.Sprintf("key/%d", i)
Expand Down Expand Up @@ -403,17 +404,26 @@ func TestGetTopUnfinishedTasks(t *testing.T) {
rs, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
select count(1) from mysql.tidb_global_task`)
require.Len(t, rs, 1)
require.Equal(t, int64(12), rs[0].GetInt64(0))
require.Equal(t, int64(13), rs[0].GetInt64(0))
return err
}))
getTaskKeys := func(tasks []*proto.TaskBase) []string {
taskKeys := make([]string, 0, len(tasks))
for _, task := range tasks {
taskKeys = append(taskKeys, task.Key)
}
return taskKeys
}
tasks, err := gm.GetTopUnfinishedTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 8)
taskKeys := make([]string, 0, len(tasks))
for _, task := range tasks {
taskKeys = append(taskKeys, task.Key)
}
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9"}, taskKeys)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9"}, getTaskKeys(tasks))

proto.MaxConcurrentTask = 6
tasks, err = gm.GetTopUnfinishedTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 11)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9", "key/10", "key/11", "key/12"}, getTaskKeys(tasks))
}

func TestGetUsedSlotsOnNodes(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/disttask/framework/storage/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package storage

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/sqlexec"
Expand Down Expand Up @@ -159,6 +162,43 @@ func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error {
return err
}

// ModifyTaskByID modifies the task by the task ID.
func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param *proto.ModifyParam) error {
if !param.PrevState.CanMoveToModifying() {
return ErrTaskStateNotAllow
}
bytes, err := json.Marshal(param)
if err != nil {
return errors.Trace(err)
}
return mgr.WithNewTxn(ctx, func(se sessionctx.Context) error {
task, err2 := mgr.getTaskBaseByID(ctx, se.GetSQLExecutor(), taskID)
if err2 != nil {
return err2
}
if task.State != param.PrevState {
return ErrTaskChanged
}
failpoint.InjectCall("beforeMoveToModifying")
_, err = sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
update mysql.tidb_global_task
set state = %?, modify_params = %?, state_update_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStateModifying, json.RawMessage(bytes), taskID, param.PrevState,
)
if err != nil {
return err
}
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
// the txn is pessimistic, it's possible that another txn has
// changed the task state before this txn commits and there is no
// write-conflict.
return ErrTaskChanged
}
return nil
})
}

// SucceedTask update task state from running to succeed.
func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
return mgr.WithNewSession(func(se sessionctx.Context) error {
Expand Down
Loading

0 comments on commit 9ff83eb

Please sign in to comment.