Skip to content

Commit

Permalink
bootstrap: add summary field for system table `tidb_background_subtas…
Browse files Browse the repository at this point in the history
…k` (#46562)

ref #46258
  • Loading branch information
tangenta authored Sep 4, 2023
1 parent 8a8b79b commit a427113
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 18 deletions.
10 changes: 4 additions & 6 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackgroundSubtaskTable stores the information of backfill jobs.
BackgroundSubtaskTable = "tidb_background_subtask"
// BackgroundSubtaskHistoryTable stores the information of history backfill jobs.
BackgroundSubtaskHistoryTable = "tidb_background_subtask_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -50,7 +46,7 @@ const (
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`.
BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `(
BackgroundSubtaskTableSQL = `create table tidb_background_subtask (
id bigint not null auto_increment primary key,
step int,
namespace varchar(256),
Expand All @@ -65,9 +61,10 @@ const (
state_update_time bigint,
meta longblob,
error BLOB,
summary json,
key idx_task_key(task_key))`
// BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `(
BackgroundSubtaskHistoryTableSQL = `create table tidb_background_subtask_history (
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
Expand All @@ -80,5 +77,6 @@ const (
start_time bigint,
state_update_time bigint,
meta longblob,
summary json,
unique key(namespace, task_key))`
)
1 change: 1 addition & 0 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Subtask struct {
// it's 0 if it hasn't started yet.
UpdateTime time.Time
Meta []byte
Summary string
}

// NewSubtask create a new subtask.
Expand Down
13 changes: 7 additions & 6 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func row2SubTask(r chunk.Row) *proto.Subtask {
SchedulerID: r.GetString(6),
State: r.GetString(8),
Meta: r.GetBytes(12),
Summary: r.GetString(14),
StartTime: startTime,
UpdateTime: updateTime,
}
Expand All @@ -310,9 +311,9 @@ func (stm *TaskManager) AddNewSubTask(globalTaskID int64, step int64, designated
}

_, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.tidb_background_subtask
(task_key, step, exec_id, meta, state, type, checkpoint)
values (%?, %?, %?, %?, %?, %?, %?)`,
globalTaskID, step, designatedTiDBID, meta, st, proto.Type2Int(tp), []byte{})
(task_key, step, exec_id, meta, state, type, checkpoint, summary)
values (%?, %?, %?, %?, %?, %?, %?, %?)`,
globalTaskID, step, designatedTiDBID, meta, st, proto.Type2Int(tp), []byte{}, "{}")
if err != nil {
return err
}
Expand Down Expand Up @@ -539,9 +540,9 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas
for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = ExecSQL(stm.ctx, se, `insert into mysql.tidb_background_subtask
(step, task_key, exec_id, meta, state, type, checkpoint)
values (%?, %?, %?, %?, %?, %?, %?)`,
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{})
(step, task_key, exec_id, meta, state, type, checkpoint, summary)
values (%?, %?, %?, %?, %?, %?, %?, %?)`,
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}")
if err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -974,11 +973,13 @@ const (
// create table `mysql.tidb_runaway_watch` and table `mysql.tidb_runaway_watch_done`
// to persist runaway watch and deletion of runaway watch at 7.3.
version172 = 172
// version 173 add column `summary` to `mysql.tidb_background_subtask`.
version173 = 173
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version172
var currentBootstrapVersion int64 = version173

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1119,6 +1120,7 @@ var (
upgradeToVer170,
upgradeToVer171,
upgradeToVer172,
upgradeToVer173,
}
)

Expand Down Expand Up @@ -2557,8 +2559,8 @@ func upgradeToVer136(s Session, ver int64) {
return
}
mustExecute(s, CreateGlobalTask)
doReentrantDDL(s, fmt.Sprintf("ALTER TABLE mysql.%s DROP INDEX namespace", ddl.BackgroundSubtaskTable), dbterror.ErrCantDropFieldOrKey)
doReentrantDDL(s, fmt.Sprintf("ALTER TABLE mysql.%s ADD INDEX idx_task_key(task_key)", ddl.BackgroundSubtaskTable), dbterror.ErrDupKeyName)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask DROP INDEX namespace", dbterror.ErrCantDropFieldOrKey)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask ADD INDEX idx_task_key(task_key)", dbterror.ErrDupKeyName)
}

func upgradeToVer137(_ Session, _ int64) {
Expand Down Expand Up @@ -2712,6 +2714,13 @@ func upgradeToVer172(s Session, ver int64) {
mustExecute(s, CreateDoneRunawayWatchTable)
}

func upgradeToVer173(s Session, ver int64) {
if ver >= version173 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask ADD COLUMN `summary` JSON", infoschema.ErrColumnExists)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down
2 changes: 1 addition & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 11,
shard_count = 12,
deps = [
"//config",
"//ddl",
Expand Down
21 changes: 20 additions & 1 deletion session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) {
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer+1, ver)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)
// Current cluster state is upgrading.
isUpgrading, err = session.IsUpgradingClusterState(seLatestV)
require.NoError(t, err)
Expand Down Expand Up @@ -826,3 +826,22 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
" PARTITION `p3` VALUES LESS THAN (4096),\n" +
" PARTITION `p4` VALUES LESS THAN (7096))"))
}

func TestDDLBackgroundSubtaskTableSummary(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
ver, err := session.GetBootstrapVersion(tk.Session())
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

tk.MustExec("use mysql")
for i := 1; i <= 10; i++ {
tk.MustExec(`insert into tidb_background_subtask(id, state, checkpoint, summary) values (?, 0, "", "{}");`, i)
}
for i := 2; i <= 10; i++ {
tk.MustExec(`update tidb_background_subtask set summary = json_set(summary, "$.row_count", ?) where id = ?;`, i, i)
}
r := tk.MustQuery("select sum(json_extract(summary, '$.row_count')) from tidb_background_subtask;")
r.Check(testkit.Rows("54"))
}

0 comments on commit a427113

Please sign in to comment.