Skip to content

Commit

Permalink
*: Exchange partition, fix LIST COLUMNs validation as well as NULL va…
Browse files Browse the repository at this point in the history
…lidation (#46533) (#46766)

close #46492
  • Loading branch information
YangKeao authored Sep 12, 2023
1 parent 71e6696 commit 8381ca7
Show file tree
Hide file tree
Showing 17 changed files with 687 additions and 164 deletions.
183 changes: 182 additions & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ func TestAlterTableTruncatePartitionPreSplitRegion(t *testing.T) {

tk.MustExec("drop table if exists t1;")
tk.MustExec(`CREATE TABLE t1 (id int, c varchar(128), key c(c)) partition by range (id) (
partition p0 values less than (10),
partition p0 values less than (10),
partition p1 values less than MAXVALUE)`)
re := tk.MustQuery("show table t1 regions")
rows := re.Rows()
Expand Down Expand Up @@ -2467,6 +2467,71 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
require.NoError(t, err)
}

func TestExchangePartitionValidation(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

dbName := "ExchangeValidation"
tk.MustExec(`create schema ` + dbName)
tk.MustExec(`use ` + dbName)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name))`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
)
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`insert into t1 values ("2023-08-06","0001")`)
}

func TestExchangePartitionPlacementPolicy(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec(`create schema ExchangePartWithPolicy`)
tk.MustExec(`use ExchangePartWithPolicy`)
tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`)
tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule1"`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule2"
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`,
"[ddl:1736]Tables have different definitions")
tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
}

func TestExchangePartitionHook(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -4647,3 +4712,119 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`)
tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'")
}

func TestListExchangeValidate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database ListExchange")
tk.MustExec("use ListExchange")
tk.MustExec("create table lcp (id int, create_ts datetime, name varchar(10))\n" +
"partition by list columns (create_ts)\n" +
"(partition p20230829 values in ('2023-08-29'),partition p20230830 values in ('2023-08-30'))")
tk.MustExec(`insert into lcp values (1,'2023-08-29','a')`)
tk.MustExec(`insert into lcp values (2,'2023-08-30','b')`)
tk.MustContainErrMsg(`insert into lcp values (3,'2023-08-31','c')`,
"[table:1526]Table has no partition for value from column_list")

tk.MustExec(`create table t (id int, create_ts datetime, name varchar(10))`)
tk.MustExec(`insert into t values (3,'2023-08-31','c')`)

tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p20230829 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp add partition
(partition p202302 values in ('2023-02-01','2023-02-28',null),
partition p202303 values in ('2023-03-01','2023-03-02','2023-03-31'))`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`truncate table t`)
tk.MustExec(`insert into t values (4,'2023-02-01','d'), (5,'2023-02-28','e'), (6, null, 'f')`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`)
tk.MustExec(`insert into t values (4,'2023-03-01','d'), (5,'2023-03-02','e'), (6,'2023-03-31','f')`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`)

tk.MustExec(`drop table t`)
tk.MustExec(`CREATE TABLE lmcp (d date, name varchar(10), data varchar(255))
PARTITION BY LIST COLUMNS(d,name)
(partition p3 values IN (('2021-01-01','a'),('2021-01-02','b'),('2021-01-03','c')),
partition p4 values IN (('2021-01-01','b'),(null,'a'),('2021-01-01',null),(null,null)),
partition p2 values IN (('2021-01-01','c'),('2021-01-02','a')),
partition p1 values IN (('2021-01-02','c')))`)
tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`)

tk.MustExec(`insert into t values ('2021-01-02', 'c', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'c', "OK"), ('2021-01-02', 'a', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'a', "OK"), ('2021-01-02','b', "OK"), ('2021-01-03','c', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'b', "OK"), ('2021-01-01',null, "OK"), (null,'a', "OK"), (null,null,"OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`)

tk.MustExec(`create table lp (a int, data varchar(255)) partition by list (a) (partition p0 values in (0,4), partition pNull values in (null))`)
tk.MustExec(`create table np (a int, data varchar(255))`)
tk.MustExec(`insert into np values (0,"OK"), (4,"OK")`)
tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`)
tk.MustExec(`insert into np values (null,"OK")`)
tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`)
// TODO: Check EXCHANGE with DEFAULT partition!!
}

func TestRangeExchangeValidate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database RangeExchange")
tk.MustExec("use RangeExchange")
tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`)
tk.MustExec("create table rcp (d date, name varchar(10), data varchar(255))\n" +
"partition by range columns (d)\n" +
"(partition p20230829 values less than ('2023-08-30'),partition p20230830 values less than ('2023-08-31'))")
tk.MustExec(`insert into rcp values ('2023-08-29', 'a', "OK")`)
tk.MustExec(`insert into rcp values ('2023-08-30', 'b', "OK")`)
tk.MustContainErrMsg(`insert into rcp values ('2023-08-31', 'c', "FAIL")`,
"[table:1526]Table has no partition for value from column_list")
tk.MustExec(`insert into t values ('2023-08-31', 'c', "FAIL")`)
tk.MustContainErrMsg(`alter table rcp EXCHANGE PARTITION p20230829 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
// TODO: Add test with a RANGE single partition (both normal AND maxvalue!)
// TODO: add test with maxvalue (1, 2, and more partitions)
// TODO: add test not in first partition (both last without maxvalue and also not last with/without maxvalue)
}

// TODO: check EXCHANGE how it handles null (for all types of partitioning!!!)
5 changes: 5 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ func (d *ddl) SwitchMDL(enable bool) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

logutil.BgLogger().Info("Switch EnableMDL 1", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load()))
// Disable MDL for test.
if enable && !variable.DefTiDBEnableConcurrentDDL {
sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %[1]s.%[2]s SET VARIABLE_VALUE = %[4]d WHERE VARIABLE_NAME = '%[3]s'",
Expand All @@ -1247,6 +1248,7 @@ func (d *ddl) SwitchMDL(enable bool) error {
return nil
}

logutil.BgLogger().Info("Switch EnableMDL 2", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load()))
isEnableBefore := variable.EnableMDL.Load()
if isEnableBefore == enable {
return nil
Expand All @@ -1268,7 +1270,10 @@ func (d *ddl) SwitchMDL(enable bool) error {
return errors.New("please wait for all jobs done")
}

logutil.BgLogger().Info("Switch EnableMDL 3", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load()))
variable.EnableMDL.Store(enable)
logutil.BgLogger().Info("Switch EnableMDL 4", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load()))

err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
oldEnable, _, err := m.GetMetadataLock()
Expand Down
3 changes: 1 addition & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4195,7 +4195,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error {
return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name))
}

// NOTE: if nt is temporary table, it should be checked
return nil
}

Expand Down Expand Up @@ -6878,7 +6877,7 @@ func checkAndGetColumnsTypeAndValuesMatch(ctx sessionctx.Context, colTypes []typ
switch colType.GetType() {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeDuration:
switch vkind {
case types.KindString, types.KindBytes:
case types.KindString, types.KindBytes, types.KindNull:
default:
return nil, dbterror.ErrWrongTypeColumnValue.GenWithStackByArgs()
}
Expand Down
36 changes: 20 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1553,24 +1553,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
var (
ptSchemaID int64
ptTableID int64
partName string
withValidation bool
)
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.OldTableID = job.TableID
affects := make([]*model.AffectedOption, 1)
affects[0] = &model.AffectedOption{
SchemaID: ptSchemaID,
TableID: ptTableID,
OldTableID: ptTableID,
diff.OldSchemaID = job.SchemaID
if job.SchemaState != model.StatePublic {
diff.TableID = job.TableID
diff.SchemaID = job.SchemaID
} else {
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64 // Not needed, will reload the whole table
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.SchemaID = ptSchemaID
diff.TableID = ptTableID
}
diff.AffectedOpts = affects
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
Expand Down
2 changes: 0 additions & 2 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func TestHalfwayCancelOperations(t *testing.T) {
tk.MustExec("insert into pt values(1), (3), (5)")
tk.MustExec("create table nt(a int)")
tk.MustExec("insert into nt values(7)")
tk.MustExec("set @@tidb_enable_exchange_partition=1")
defer tk.MustExec("set @@tidb_enable_exchange_partition=0")
err = tk.ExecToErr("alter table pt exchange partition p1 with table nt")
require.Error(t, err)

Expand Down
3 changes: 3 additions & 0 deletions ddl/metadatalocktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ go_test(
"//ddl",
"//errno",
"//server",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"//util/logutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
Loading

0 comments on commit 8381ca7

Please sign in to comment.