Skip to content

Commit

Permalink
*: Exchange partition, fix LIST COLUMNs validation as well as NULL va…
Browse files Browse the repository at this point in the history
…lidation (#46533)

close #46492
  • Loading branch information
mjonss authored Sep 6, 2023
1 parent abec0c6 commit e8b590c
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 34 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7817,7 +7817,7 @@ func checkAndGetColumnsTypeAndValuesMatch(ctx sessionctx.Context, colTypes []typ
switch colType.GetType() {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeDuration:
switch vkind {
case types.KindString, types.KindBytes:
case types.KindString, types.KindBytes, types.KindNull:
default:
return nil, dbterror.ErrWrongTypeColumnValue.GenWithStackByArgs()
}
Expand Down
83 changes: 53 additions & 30 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3421,6 +3421,8 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
if rowCount != 0 {
return errors.Trace(dbterror.ErrRowDoesNotMatchPartition)
}
// Check warnings!
// Is it possible to check how many rows where checked as well?
return nil
}

Expand Down Expand Up @@ -3461,73 +3463,94 @@ func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, sche
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" >= %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return buf.String(), paramList
} else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" < %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]))
return buf.String(), paramList
} else {
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" < %? or ")
buf.WriteString(pi.Expr)
buf.WriteString(" >= %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]), trimQuotation(pi.Definitions[index].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]), driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return buf.String(), paramList
}
}

func trimQuotation(str string) string {
return strings.Trim(str, "'")
}

func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
paramList := make([]interface{}, 0, 6)
colName := pi.Columns[0].L
if index == 0 {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return "select 1 from %n.%n where %n >= %? limit 1", paramList
} else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]))
return "select 1 from %n.%n where %n < %? limit 1", paramList
} else {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]), colName, trimQuotation(pi.Definitions[index].LessThan[0]))
paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]), colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return "select 1 from %n.%n where %n < %? or %n >= %? limit 1", paramList
}
}

func buildCheckSQLForListPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" not in (%?) limit 1")
inValues := getInValues(pi, index)

paramList := make([]interface{}, 0, 3)
paramList = append(paramList, schemaName.L, tableName.L, inValues)
buf.WriteString(" not (")
for i, inValue := range pi.Definitions[index].InValues {
if i != 0 {
buf.WriteString(" OR ")
}
// AND has higher priority than OR, so no need for parentheses
for j, val := range inValue {
if j != 0 {
// Should never happen, since there should be no multi-columns, only a single expression :)
buf.WriteString(" AND ")
}
// null-safe compare '<=>'
buf.WriteString(fmt.Sprintf("(%s) <=> %s", pi.Expr, val))
}
}
buf.WriteString(") limit 1")
paramList := make([]interface{}, 0, 2)
paramList = append(paramList, schemaName.L, tableName.L)
return buf.String(), paramList
}

func buildCheckSQLForListColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
colName := pi.Columns[0].L
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where %n not in (%?) limit 1")
inValues := getInValues(pi, index)

paramList := make([]interface{}, 0, 4)
paramList = append(paramList, schemaName.L, tableName.L, colName, inValues)
return buf.String(), paramList
}

func getInValues(pi *model.PartitionInfo, index int) []string {
inValues := make([]string, 0, len(pi.Definitions[index].InValues))
for _, inValue := range pi.Definitions[index].InValues {
inValues = append(inValues, inValue...)
// How to find a match?
// (row <=> vals1) OR (row <=> vals2)
// How to find a non-matching row:
// NOT ( (row <=> vals1) OR (row <=> vals2) ... )
buf.WriteString("select 1 from %n.%n where not (")
colNames := make([]string, 0, len(pi.Columns))
for i := range pi.Columns {
// TODO: check if there are no proper quoting function for this?
n := "`" + strings.ReplaceAll(pi.Columns[i].O, "`", "``") + "`"
colNames = append(colNames, n)
}
for i, colValues := range pi.Definitions[index].InValues {
if i != 0 {
buf.WriteString(" OR ")
}
// AND has higher priority than OR, so no need for parentheses
for j, val := range colValues {
if j != 0 {
buf.WriteString(" AND ")
}
// null-safe compare '<=>'
buf.WriteString(fmt.Sprintf("%s <=> %s", colNames[j], val))
}
}
return inValues
buf.WriteString(") limit 1")
paramList := make([]interface{}, 0, 2)
paramList = append(paramList, schemaName.L, tableName.L)
return buf.String(), paramList
}

func checkAddPartitionTooManyPartitions(piDefs uint64) error {
Expand Down
116 changes: 116 additions & 0 deletions ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6459,3 +6459,119 @@ func TestListDefinitionError(t *testing.T) {
tk.MustContainErrMsg(`alter table t add partition (partition p2 values less than (2))`, "[ddl:1480]Only RANGE PARTITIONING can use VALUES LESS THAN in partition definition")
tk.MustContainErrMsg(`alter table t add partition (partition p2)`, "[ddl:1479]Syntax : LIST PARTITIONING requires definition of VALUES IN for each partition")
}

func TestListExchangeValidate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database ListExchange")
tk.MustExec("use ListExchange")
tk.MustExec("create table lcp (id int, create_ts datetime, name varchar(10))\n" +
"partition by list columns (create_ts)\n" +
"(partition p20230829 values in ('2023-08-29'),partition p20230830 values in ('2023-08-30'))")
tk.MustExec(`insert into lcp values (1,'2023-08-29','a')`)
tk.MustExec(`insert into lcp values (2,'2023-08-30','b')`)
tk.MustContainErrMsg(`insert into lcp values (3,'2023-08-31','c')`,
"[table:1526]Table has no partition for value from column_list")

tk.MustExec(`create table t (id int, create_ts datetime, name varchar(10))`)
tk.MustExec(`insert into t values (3,'2023-08-31','c')`)

tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p20230829 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp add partition
(partition p202302 values in ('2023-02-01','2023-02-28',null),
partition p202303 values in ('2023-03-01','2023-03-02','2023-03-31'))`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`truncate table t`)
tk.MustExec(`insert into t values (4,'2023-02-01','d'), (5,'2023-02-28','e'), (6, null, 'f')`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`)
tk.MustExec(`insert into t values (4,'2023-03-01','d'), (5,'2023-03-02','e'), (6,'2023-03-31','f')`)
tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`)

tk.MustExec(`drop table t`)
tk.MustExec(`CREATE TABLE lmcp (d date, name varchar(10), data varchar(255))
PARTITION BY LIST COLUMNS(d,name)
(partition p3 values IN (('2021-01-01','a'),('2021-01-02','b'),('2021-01-03','c')),
partition p4 values IN (('2021-01-01','b'),(null,'a'),('2021-01-01',null),(null,null)),
partition p2 values IN (('2021-01-01','c'),('2021-01-02','a')),
partition p1 values IN (('2021-01-02','c')))`)
tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`)

tk.MustExec(`insert into t values ('2021-01-02', 'c', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'c', "OK"), ('2021-01-02', 'a', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'a', "OK"), ('2021-01-02','b', "OK"), ('2021-01-03','c', "OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`)

tk.MustExec(`insert into t values ('2021-01-01', 'b', "OK"), ('2021-01-01',null, "OK"), (null,'a', "OK"), (null,null,"OK")`)
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`)

tk.MustExec(`create table lp (a int, data varchar(255)) partition by list (a) (partition p0 values in (0,4), partition pNull values in (null))`)
tk.MustExec(`create table np (a int, data varchar(255))`)
tk.MustExec(`insert into np values (0,"OK"), (4,"OK")`)
tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`)
tk.MustExec(`insert into np values (null,"OK")`)
tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`)
// TODO: Check EXCHANGE with DEFAULT partition!!
}

func TestRangeExchangeValidate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database RangeExchange")
tk.MustExec("use RangeExchange")
tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`)
tk.MustExec("create table rcp (d date, name varchar(10), data varchar(255))\n" +
"partition by range columns (d)\n" +
"(partition p20230829 values less than ('2023-08-30'),partition p20230830 values less than ('2023-08-31'))")
tk.MustExec(`insert into rcp values ('2023-08-29', 'a', "OK")`)
tk.MustExec(`insert into rcp values ('2023-08-30', 'b', "OK")`)
tk.MustContainErrMsg(`insert into rcp values ('2023-08-31', 'c', "FAIL")`,
"[table:1526]Table has no partition for value from column_list")
tk.MustExec(`insert into t values ('2023-08-31', 'c', "FAIL")`)
tk.MustContainErrMsg(`alter table rcp EXCHANGE PARTITION p20230829 WITH TABLE t`,
"[ddl:1737]Found a row that does not match the partition")
// TODO: Add test with a RANGE single partition (both normal AND maxvalue!)
// TODO: add test with maxvalue (1, 2, and more partitions)
// TODO: add test not in first partition (both last without maxvalue and also not last with/without maxvalue)
}

// TODO: check EXCHANGE how it handles null (for all types of partitioning!!!)
2 changes: 1 addition & 1 deletion planner/core/integration_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func TestRangeColumnsMultiColumn(t *testing.T) {

tk.MustGetErrCode(`create table t (a int, b datetime, c varchar(255)) partition by range columns (a,b,c)`+
`(partition p0 values less than (NULL,NULL,NULL))`,
errno.ErrWrongTypeColumnValue)
errno.ErrParse)
tk.MustGetErrCode(`create table t (a int, b datetime, c varchar(255)) partition by range columns (a,b,c)`+
`(partition p1 values less than (`+strconv.FormatInt(math.MinInt32-1, 10)+`,'0000-00-00',""))`,
errno.ErrWrongTypeColumnValue)
Expand Down
4 changes: 2 additions & 2 deletions types/parser_driver/value_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ func (n *ValueExpr) Format(w io.Writer) {
}

// WrapInSingleQuotes escapes single quotes and backslashs
// and adds single quotes arond the string
// and adds single quotes around the string
func WrapInSingleQuotes(inStr string) string {
s := strings.ReplaceAll(inStr, "\\", "\\\\")
s = strings.ReplaceAll(s, `'`, `''`)
return fmt.Sprintf("'%s'", s)
}

// UnwrapFromSingleQuotes the reverse of WrapInSingleQuotes
// but also allows non single quoted strings
// but also allows non-single quoted strings
func UnwrapFromSingleQuotes(inStr string) string {
if len(inStr) < 2 || inStr[:1] != "'" || inStr[len(inStr)-1:] != "'" {
return inStr
Expand Down

0 comments on commit e8b590c

Please sign in to comment.