Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#57114
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
mjonss authored and ti-chi-bot committed Dec 10, 2024
1 parent b298e21 commit 8e8da6d
Show file tree
Hide file tree
Showing 10 changed files with 4,681 additions and 67 deletions.
30 changes: 30 additions & 0 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,36 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return errors.Trace(err)
}
}
<<<<<<< HEAD
=======
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)"))
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// Delete dropped partitions, as well as replaced global indexes.
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
for _, idx := range args.OldGlobalIndexes {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
args, err := model.GetTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)"))
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
Expand Down
386 changes: 344 additions & 42 deletions pkg/ddl/partition.go

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) {
panic(err)
}
if actualCnt != expectedCnt {
panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt))
panic(fmt.Sprintf("expect delete range count %d, actual count %d for job type '%s'", expectedCnt, actualCnt, job.Type.String()))
}
}

Expand Down Expand Up @@ -106,7 +106,21 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return 0, errors.Trace(err)
}
<<<<<<< HEAD
return len(physicalTableIDs), nil
=======
if job.Type == model.ActionTruncateTable {
return len(args.OldPartitionIDs) + 1, nil
}
return len(args.OldPartitionIDs), nil
case model.ActionDropTablePartition, model.ActionReorganizePartition,
model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
return len(args.OldPhysicalTblIDs) + len(args.OldGlobalIndexes), nil
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexID := make([]int64, 1)
ifExists := make([]bool, 1)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/table",
"//pkg/table/tables",
Expand Down
112 changes: 91 additions & 21 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,16 +1401,41 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
partition p2 values less than (20),
partition p3 values less than (30)
);`)
<<<<<<< HEAD
tk.MustExec("alter table test_global add unique index idx_b (b);")
tk.MustExec("insert into test_global values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);")

hook := &callback.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
=======
tk.MustExec("alter table test_global add unique index idx_b (b) global")
tk.MustExec("insert into test_global values (1, 1, 1), (2, 2, 2), (11, 11, 11), (12, 12, 12)")

doneMap := make(map[model.SchemaState]struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) {
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
assert.Equal(t, model.ActionDropTablePartition, job.Type)
if job.SchemaState == model.StateDeleteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into test_global values (9, 9, 9)")
if _, ok := doneMap[job.SchemaState]; ok {
return
}
doneMap[job.SchemaState] = struct{}{}
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
switch job.SchemaState {
case model.StatePublic:
tk2.MustExec("insert into test_global values (3, 3, 3)")
tk2.MustExec("insert into test_global values (13, 13, 13)")
case model.StateWriteOnly:
tk2.MustContainErrMsg("insert into test_global values (4, 4, 4)", "[table:1526]Table has no partition for value matching a partition being dropped, 'p1'")
tk2.MustExec("insert into test_global values (14, 14, 14)")
case model.StateDeleteOnly:
tk2.MustExec("insert into test_global values (5, 5, 5)")
tk2.MustExec("insert into test_global values (15, 15, 15)")
case model.StateDeleteReorganization:
tk2.MustExec("insert into test_global values (6, 6, 6)")
tk2.MustExec("insert into test_global values (16, 16, 16)")
default:
require.Fail(t, "invalid schema state '%s'", job.SchemaState.String())
}
}
dom.DDL().SetHook(hook)
Expand All @@ -1420,7 +1445,7 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
tk1.MustExec("alter table test_global drop partition p1")

tk.MustExec("analyze table test_global")
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("9 9 9", "11 11 11", "12 12 12"))
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("5 5 5", "6 6 6", "11 11 11", "12 12 12", "13 13 13", "14 14 14", "15 15 15", "16 16 16"))
}

func TestUpdateGlobalIndex(t *testing.T) {
Expand Down Expand Up @@ -3608,10 +3633,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))

waitFor(4, "t", "write reorganization")
tk3.MustExec(`BEGIN`)
Expand All @@ -3621,28 +3646,73 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`insert into t values (null, 23)`)
tk2.MustExec(`COMMIT`)

<<<<<<< HEAD
/*
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
=======
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))

tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
*/
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`COMMIT`)
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"35 34 22",
"39 38 24",
"43 42 26"))
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))

//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"41 40 25"))

waitFor(4, "t", "public")
tk2.MustExec(`commit`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"39 38 24",
"41 40 25",
"43 42 26"))
require.NoError(t, <-alterChan)
}

func TestAlterLastIntervalPartition(t *testing.T) {
Expand Down
Loading

0 comments on commit 8e8da6d

Please sign in to comment.