Skip to content

Commit

Permalink
*: Reorg partition fix delete ranges and handling non-clustered table…
Browse files Browse the repository at this point in the history
…s with concurrent DML (#57114) (#57812)

ref #45133, close #56822, close #57510
  • Loading branch information
ti-chi-bot authored Nov 29, 2024
1 parent bfa61d2 commit 64ce2c0
Show file tree
Hide file tree
Showing 17 changed files with 918 additions and 383 deletions.
17 changes: 14 additions & 3 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,24 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionReorganizePartition,
model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
case model.ActionDropTablePartition:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize/drop partition: physical table ID(s)"))
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)"))
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// Delete dropped partitions, as well as replaced global indexes.
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
for _, idx := range args.OldGlobalIndexes {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
args, err := model.GetTruncateTableArgs(job)
if err != nil {
Expand Down
362 changes: 242 additions & 120 deletions pkg/ddl/partition.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if job.SchemaState == model.StatePublic {
// We started to destroy the old indexes, so we can no longer rollback!
job.State = model.JobStateRunning
return ver, nil
}
jobCtx.jobArgs = args

return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (e *executor) checkDeleteRangeCnt(job *model.Job) {
panic(err)
}
if actualCnt != expectedCnt {
panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt))
panic(fmt.Sprintf("expect delete range count %d, actual count %d for job type '%s'", expectedCnt, actualCnt, job.Type.String()))
}
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
if err != nil {
return 0, errors.Trace(err)
}
return len(args.OldPhysicalTblIDs), nil
return len(args.OldPhysicalTblIDs) + len(args.OldGlobalIndexes), nil
case model.ActionAddIndex, model.ActionAddPrimaryKey:
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job,
func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
if job.SchemaState == model.StateNone {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
partInfo := args.PartInfo
// Final part, new table id is assigned
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/table",
"//pkg/table/tables",
Expand Down
110 changes: 82 additions & 28 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,14 +1336,32 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
partition p3 values less than (30)
);`)
tk.MustExec("alter table test_global add unique index idx_b (b) global")
tk.MustExec("insert into test_global values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);")
tk.MustExec("insert into test_global values (1, 1, 1), (2, 2, 2), (11, 11, 11), (12, 12, 12)")

doneMap := make(map[model.SchemaState]struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) {
assert.Equal(t, model.ActionDropTablePartition, job.Type)
if job.SchemaState == model.StateDeleteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into test_global values (9, 9, 9)")
if _, ok := doneMap[job.SchemaState]; ok {
return
}
doneMap[job.SchemaState] = struct{}{}
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
switch job.SchemaState {
case model.StatePublic:
tk2.MustExec("insert into test_global values (3, 3, 3)")
tk2.MustExec("insert into test_global values (13, 13, 13)")
case model.StateWriteOnly:
tk2.MustContainErrMsg("insert into test_global values (4, 4, 4)", "[table:1526]Table has no partition for value matching a partition being dropped, 'p1'")
tk2.MustExec("insert into test_global values (14, 14, 14)")
case model.StateDeleteOnly:
tk2.MustExec("insert into test_global values (5, 5, 5)")
tk2.MustExec("insert into test_global values (15, 15, 15)")
case model.StateDeleteReorganization:
tk2.MustExec("insert into test_global values (6, 6, 6)")
tk2.MustExec("insert into test_global values (16, 16, 16)")
default:
require.Fail(t, "invalid schema state '%s'", job.SchemaState.String())
}
})

Expand All @@ -1352,7 +1370,7 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
tk1.MustExec("alter table test_global drop partition p1")

tk.MustExec("analyze table test_global")
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("9 9 9", "11 11 11", "12 12 12"))
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("5 5 5", "6 6 6", "11 11 11", "12 12 12", "13 13 13", "14 14 14", "15 15 15", "16 16 16"))
}

func TestGlobalIndexUpdateInDropPartition(t *testing.T) {
Expand Down Expand Up @@ -3168,10 +3186,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))

waitFor(4, "t", "write reorganization")
tk3.MustExec(`BEGIN`)
Expand All @@ -3181,30 +3199,66 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`insert into t values (null, 23)`)
tk2.MustExec(`COMMIT`)

/*
// Currently there is an duplicate entry issue, so it will rollback in WriteReorganization
// instead of continuing.
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)

tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
*/
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`COMMIT`)
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"35 34 22",
"39 38 24",
"43 42 26"))
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))

//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"41 40 25"))

waitFor(4, "t", "public")
tk2.MustExec(`commit`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"39 38 24",
"41 40 25",
"43 42 26"))
require.NoError(t, <-alterChan)
}

func TestAlterLastIntervalPartition(t *testing.T) {
Expand Down
Loading

0 comments on commit 64ce2c0

Please sign in to comment.