From 262ba3ecc2b52c2a91f82a06450c8662c5c0ed47 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 17 Oct 2022 15:03:20 +0200 Subject: [PATCH 1/5] Table partition double write during Reorganize partition (part 2) --- ddl/db_partition_test.go | 56 ++++ ddl/reorg.go | 6 +- executor/builder.go | 14 +- planner/core/point_get_plan.go | 12 +- planner/core/rule_partition_processor.go | 13 +- session/bootstrap.go | 9 +- table/tables/partition.go | 366 ++++++++++++++++------- table/tables/partition_test.go | 5 +- 8 files changed, 334 insertions(+), 147 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 93ed264deb6e8..803d1ae5f55fb 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4656,3 +4656,59 @@ func TestAlterModifyColumnOnPartitionedTable(t *testing.T) { "46 46", "57 57")) } + +func TestReorgPartitionConcurrent(t *testing.T) { + t.Skip("Needs PR 38460 as well") + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartConcurrent" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + dom := domain.GetDomain(tk.Session()) + originHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(originHook) + hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + + wait := make(chan bool) + defer close(wait) + + injected := false + hook.OnJobRunBeforeExported = func(job *model.Job) { + if /* TODO: uncomment!! job.Type == model.ActionReorganizePartition && */ job.SchemaState == model.StateWriteReorganization && !injected { + injected = true + <-wait + <-wait + } + } + alterErr := make(chan error, 1) + go backgroundExec(store /* TODO: uncomment!! schemaName, */, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) + wait <- true + tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) + wait <- true + require.NoError(t, <-alterErr) + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "12 12 21", + "14 14 14", + "15 15 15")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) +} diff --git a/ddl/reorg.go b/ddl/reorg.go index 2c7508d24b38f..ff62e1687268a 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -559,10 +559,12 @@ func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshot endHandleKey = tablecodec.EncodeRecordKey(tbl.RecordPrefix(), maxHandle) } if isEmptyTable || endHandleKey.Cmp(startHandleKey) < 0 { - logutil.BgLogger().Info("[ddl] get table range, endHandle < startHandle", zap.String("table", fmt.Sprintf("%v", tbl.Meta())), + logutil.BgLogger().Info("[ddl] get table range, endHandle < startHandle", zap.Int64("table/partition ID", tbl.GetPhysicalID()), + zap.Bool("isEmptyTable", isEmptyTable), zap.String("endHandle", tryDecodeToHandleString(endHandleKey)), - zap.String("startHandle", tryDecodeToHandleString(startHandleKey))) + zap.String("startHandle", tryDecodeToHandleString(startHandleKey)), + zap.String("table", fmt.Sprintf("%v", tbl.Meta()))) endHandleKey = startHandleKey } return diff --git a/executor/builder.go b/executor/builder.go index 007fe3a557b16..741b36562fcc5 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3466,12 +3466,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table // check whether can runtime prune. type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) - } - pe, err := tbl.(partitionExpr).PartitionExpr() - if err != nil { - return nil, false, nil, err + PartitionExpr() *tables.PartitionExpr } + pe := tbl.(partitionExpr).PartitionExpr() // recalculate key column offsets if len(lookUpContent) == 0 { @@ -4073,12 +4070,9 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte } tbl, _ := builder.is.TableByID(tbInfo.ID) pt := tbl.(table.PartitionedTable) - pe, err := tbl.(interface { - PartitionExpr() (*tables.PartitionExpr, error) + pe := tbl.(interface { + PartitionExpr() *tables.PartitionExpr }).PartitionExpr() - if err != nil { - return nil, err - } partitionInfo := &v.PartitionInfo usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) if err != nil { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 96e17291e421c..81ad9977f5bd1 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1851,12 +1851,7 @@ func getPartitionExpr(ctx sessionctx.Context, tbl *model.TableInfo) *tables.Part } // PartitionExpr don't need columns and names for hash partition. - partitionExpr, err := partTable.PartitionExpr() - if err != nil { - return nil - } - - return partitionExpr + return partTable.PartitionExpr() } func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *ast.ColumnName { @@ -1873,10 +1868,7 @@ func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *a return nil } // PartitionExpr don't need columns and names for hash partition. - partitionExpr, err := table.(partitionTable).PartitionExpr() - if err != nil { - return nil - } + partitionExpr := table.(partitionTable).PartitionExpr() expr := partitionExpr.OrigExpr col, ok := expr.(*ast.ColumnNameExpr) if !ok { diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index d188854686491..746964806c888 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -110,7 +110,7 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan, opt *logicalOptim // partitionTable is for those tables which implement partition. type partitionTable interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, columns []*expression.Column, names types.NameSlice) (expression.Expression, error) { @@ -594,13 +594,11 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr, conds []expression.Expression) ([]int, error) { pi := tbl.Meta().Partition - partExpr, err := tbl.(partitionTable).PartitionExpr() - if err != nil { - return nil, err - } + partExpr := tbl.(partitionTable).PartitionExpr() listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning) var used map[int]struct{} + var err error if partExpr.ForListPruning.ColPrunes == nil { used, err = listPruner.findUsedListPartitions(conds) } else { @@ -825,10 +823,7 @@ func intersectionRange(start, end, newStart, newEnd int) (int, int) { func (s *partitionProcessor) pruneRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, tbl table.PartitionedTable, conds []expression.Expression, columns []*expression.Column, names types.NameSlice) (partitionRangeOR, error) { - partExpr, err := tbl.(partitionTable).PartitionExpr() - if err != nil { - return nil, err - } + partExpr := tbl.(partitionTable).PartitionExpr() // Partition by range columns. if len(pi.Columns) > 0 { diff --git a/session/bootstrap.go b/session/bootstrap.go index 1a9c79170615f..fd3a2b7a0978e 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -2161,7 +2161,7 @@ func oldPasswordUpgrade(pass string) (string, error) { // rebuildAllPartitionValueMapAndSorted rebuilds all value map and sorted info for list column partitions with InfoSchema. func rebuildAllPartitionValueMapAndSorted(s *session) { type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } p := parser.New() @@ -2173,12 +2173,9 @@ func rebuildAllPartitionValueMapAndSorted(s *session) { continue } - pe, err := t.(partitionExpr).PartitionExpr() - if err != nil { - panic("partition table gets partition expression failed") - } + pe := t.(partitionExpr).PartitionExpr() for _, cp := range pe.ColPrunes { - if err = cp.RebuildPartitionValueMapAndSorted(p); err != nil { + if err := cp.RebuildPartitionValueMapAndSorted(p, pi.Definitions); err != nil { logutil.BgLogger().Warn("build list column partition value map and sorted failed") break } diff --git a/table/tables/partition.go b/table/tables/partition.go index 6a0b315b856e9..372a39d847449 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -82,11 +82,18 @@ type partitionedTable struct { partitions map[int64]*partition evalBufferTypes []*types.FieldType evalBufferPool sync.Pool + // Only used during Reorganize partition + reorgPartitions map[int64]interface{} + reorgPartitionExpr *PartitionExpr } func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Table, error) { + pi := tblInfo.GetPartitionInfo() + if pi == nil || len(pi.Definitions) == 0 { + return nil, table.ErrUnknownPartition + } ret := &partitionedTable{TableCommon: *tbl} - partitionExpr, err := newPartitionExpr(tblInfo) + partitionExpr, err := newPartitionExpr(tblInfo, pi.Definitions) if err != nil { return nil, errors.Trace(err) } @@ -100,7 +107,6 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Tabl if err := initTableIndices(&ret.TableCommon); err != nil { return nil, errors.Trace(err) } - pi := tblInfo.GetPartitionInfo() partitions := make(map[int64]*partition, len(pi.Definitions)) for _, p := range pi.Definitions { var t partition @@ -111,10 +117,20 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Tabl partitions[p.ID] = &t } ret.partitions = partitions + if len(pi.DroppingDefinitions) > 0 && len(pi.AddingDefinitions) > 0 { + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.AddingDefinitions) + if err != nil { + return nil, errors.Trace(err) + } + ret.reorgPartitions = make(map[int64]interface{}, len(pi.DroppingDefinitions)) + for _, def := range pi.DroppingDefinitions { + ret.reorgPartitions[def.ID] = nil + } + } return ret, nil } -func newPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) { +func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition) (*PartitionExpr, error) { // a partitioned table cannot rely on session context/sql modes, so use a default one! ctx := mock.NewContext() dbName := model.NewCIStr(ctx.GetSessionVars().CurrentDB) @@ -125,11 +141,11 @@ func newPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) { pi := tblInfo.GetPartitionInfo() switch pi.Type { case model.PartitionTypeRange: - return generateRangePartitionExpr(ctx, pi, columns, names) + return generateRangePartitionExpr(ctx, pi, defs, columns, names) case model.PartitionTypeHash: return generateHashPartitionExpr(ctx, pi, columns, names) case model.PartitionTypeList: - return generateListPartitionExpr(ctx, tblInfo, columns, names) + return generateListPartitionExpr(ctx, tblInfo, defs, columns, names) } panic("cannot reach here") } @@ -148,8 +164,6 @@ type PartitionExpr struct { *ForRangeColumnsPruning // ColOffset is the offsets of partition columns. ColumnOffset []int - // InValues: x in (1,2); x in (3,4); x in (5,6), used for list partition. - InValues []expression.Expression *ForListPruning } @@ -182,19 +196,19 @@ type ForRangeColumnsPruning struct { LessThan [][]*expression.Expression } -func dataForRangeColumnsPruning(ctx sessionctx.Context, pi *model.PartitionInfo, schema *expression.Schema, names []*types.FieldName, p *parser.Parser) (*ForRangeColumnsPruning, error) { +func dataForRangeColumnsPruning(ctx sessionctx.Context, defs []model.PartitionDefinition, schema *expression.Schema, names []*types.FieldName, p *parser.Parser) (*ForRangeColumnsPruning, error) { var res ForRangeColumnsPruning - res.LessThan = make([][]*expression.Expression, 0, len(pi.Definitions)) - for i := 0; i < len(pi.Definitions); i++ { - lessThanCols := make([]*expression.Expression, 0, len(pi.Columns)) - for j := range pi.Definitions[i].LessThan { - if strings.EqualFold(pi.Definitions[i].LessThan[j], "MAXVALUE") { + res.LessThan = make([][]*expression.Expression, 0, len(defs)) + for i := 0; i < len(defs); i++ { + lessThanCols := make([]*expression.Expression, 0, len(defs[i].LessThan)) + for j := range defs[i].LessThan { + if strings.EqualFold(defs[i].LessThan[j], "MAXVALUE") { // Use a nil pointer instead of math.MaxInt64 to avoid the corner cases. lessThanCols = append(lessThanCols, nil) // No column after MAXVALUE matters break } - tmp, err := parseSimpleExprWithNames(p, ctx, pi.Definitions[i].LessThan[j], schema, names) + tmp, err := parseSimpleExprWithNames(p, ctx, defs[i].LessThan[j], schema, names) if err != nil { return nil, err } @@ -426,29 +440,29 @@ type ForRangePruning struct { Unsigned bool } -// dataForRangePruning extracts the less than parts from 'partition p0 less than xx ... partitoin p1 less than ...' -func dataForRangePruning(sctx sessionctx.Context, pi *model.PartitionInfo) (*ForRangePruning, error) { +// dataForRangePruning extracts the less than parts from 'partition p0 less than xx ... partition p1 less than ...' +func dataForRangePruning(sctx sessionctx.Context, defs []model.PartitionDefinition) (*ForRangePruning, error) { var maxValue bool var unsigned bool - lessThan := make([]int64, len(pi.Definitions)) - for i := 0; i < len(pi.Definitions); i++ { - if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") { + lessThan := make([]int64, len(defs)) + for i := 0; i < len(defs); i++ { + if strings.EqualFold(defs[i].LessThan[0], "MAXVALUE") { // Use a bool flag instead of math.MaxInt64 to avoid the corner cases. maxValue = true } else { var err error - lessThan[i], err = strconv.ParseInt(pi.Definitions[i].LessThan[0], 10, 64) + lessThan[i], err = strconv.ParseInt(defs[i].LessThan[0], 10, 64) var numErr *strconv.NumError if stderr.As(err, &numErr) && numErr.Err == strconv.ErrRange { var tmp uint64 - tmp, err = strconv.ParseUint(pi.Definitions[i].LessThan[0], 10, 64) + tmp, err = strconv.ParseUint(defs[i].LessThan[0], 10, 64) lessThan[i] = int64(tmp) unsigned = true } if err != nil { - val, ok := fixOldVersionPartitionInfo(sctx, pi.Definitions[i].LessThan[0]) + val, ok := fixOldVersionPartitionInfo(sctx, defs[i].LessThan[0]) if !ok { - logutil.BgLogger().Error("wrong partition definition", zap.String("less than", pi.Definitions[i].LessThan[0])) + logutil.BgLogger().Error("wrong partition definition", zap.String("less than", defs[i].LessThan[0])) return nil, errors.WithStack(err) } lessThan[i] = val @@ -490,40 +504,14 @@ func rangePartitionExprStrings(pi *model.PartitionInfo) []string { } func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, - columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { + defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. - locateExprs := make([]expression.Expression, 0, len(pi.Definitions)) - var buf bytes.Buffer p := parser.New() schema := expression.NewSchema(columns...) partStrs := rangePartitionExprStrings(pi) - for i := 0; i < len(pi.Definitions); i++ { - if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") { - // Expr less than maxvalue is always true. - fmt.Fprintf(&buf, "true") - } else { - maxValueFound := false - for j := range partStrs[1:] { - if strings.EqualFold(pi.Definitions[i].LessThan[j+1], "MAXVALUE") { - // if any column will be less than MAXVALUE, so change < to <= of the previous prefix of columns - fmt.Fprintf(&buf, "((%s) <= (%s))", strings.Join(partStrs[:j+1], ","), strings.Join(pi.Definitions[i].LessThan[:j+1], ",")) - maxValueFound = true - break - } - } - if !maxValueFound { - fmt.Fprintf(&buf, "((%s) < (%s))", strings.Join(partStrs, ","), strings.Join(pi.Definitions[i].LessThan, ",")) - } - } - - expr, err := parseSimpleExprWithNames(p, ctx, buf.String(), schema, names) - if err != nil { - // If it got an error here, ddl may hang forever, so this error log is important. - logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err)) - return nil, errors.Trace(err) - } - locateExprs = append(locateExprs, expr) - buf.Reset() + locateExprs, err := getRangeLocateExprs(ctx, p, defs, partStrs, schema, names) + if err != nil { + return nil, errors.Trace(err) } ret := &PartitionExpr{ UpperBounds: locateExprs, @@ -536,14 +524,14 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, ret.ColumnOffset = offset if len(pi.Columns) < 1 { - tmp, err := dataForRangePruning(ctx, pi) + tmp, err := dataForRangePruning(ctx, defs) if err != nil { return nil, errors.Trace(err) } ret.Expr = partExpr ret.ForRangePruning = tmp } else { - tmp, err := dataForRangeColumnsPruning(ctx, pi, schema, names, p) + tmp, err := dataForRangeColumnsPruning(ctx, defs, schema, names, p) if err != nil { return nil, errors.Trace(err) } @@ -552,6 +540,40 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, return ret, nil } +func getRangeLocateExprs(ctx sessionctx.Context, p *parser.Parser, defs []model.PartitionDefinition, partStrs []string, schema *expression.Schema, names types.NameSlice) ([]expression.Expression, error) { + var buf bytes.Buffer + locateExprs := make([]expression.Expression, 0, len(defs)) + for i := 0; i < len(defs); i++ { + if strings.EqualFold(defs[i].LessThan[0], "MAXVALUE") { + // Expr less than maxvalue is always true. + fmt.Fprintf(&buf, "true") + } else { + maxValueFound := false + for j := range partStrs[1:] { + if strings.EqualFold(defs[i].LessThan[j+1], "MAXVALUE") { + // if any column will be less than MAXVALUE, so change < to <= of the previous prefix of columns + fmt.Fprintf(&buf, "((%s) <= (%s))", strings.Join(partStrs[:j+1], ","), strings.Join(defs[i].LessThan[:j+1], ",")) + maxValueFound = true + break + } + } + if !maxValueFound { + fmt.Fprintf(&buf, "((%s) < (%s))", strings.Join(partStrs, ","), strings.Join(defs[i].LessThan, ",")) + } + } + + expr, err := parseSimpleExprWithNames(p, ctx, buf.String(), schema, names) + if err != nil { + // If it got an error here, ddl may hang forever, so this error log is important. + logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err)) + return nil, errors.Trace(err) + } + locateExprs = append(locateExprs, expr) + buf.Reset() + } + return locateExprs, nil +} + func getColumnsOffset(cols, columns []*expression.Column) []int { colsOffset := make([]int, len(cols)) for i, col := range columns { @@ -603,7 +625,7 @@ func extractPartitionExprColumns(ctx sessionctx.Context, pi *model.PartitionInfo } func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, - columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { + defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. pi := tblInfo.GetPartitionInfo() partExpr, exprCols, offset, err := extractPartitionExprColumns(ctx, pi, columns, names) @@ -612,9 +634,9 @@ func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, } listPrune := &ForListPruning{} if len(pi.Columns) == 0 { - err = listPrune.buildListPruner(ctx, tblInfo, exprCols, columns, names) + err = listPrune.buildListPruner(ctx, tblInfo, defs, exprCols, columns, names) } else { - err = listPrune.buildListColumnsPruner(ctx, tblInfo, columns, names) + err = listPrune.buildListColumnsPruner(ctx, tblInfo, defs, columns, names) } if err != nil { return nil, err @@ -627,7 +649,7 @@ func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, return ret, nil } -func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, exprCols []*expression.Column, +func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, defs []model.PartitionDefinition, exprCols []*expression.Column, columns []*expression.Column, names types.NameSlice) error { pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) @@ -638,7 +660,7 @@ func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err)) return errors.Trace(err) } - // Since need to change the column index of the expresion, clone the expression first. + // Since need to change the column index of the expression, clone the expression first. lp.LocateExpr = expr.Clone() lp.PruneExprCols = exprCols lp.PruneExpr = expr.Clone() @@ -650,14 +672,15 @@ func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model } c.Index = idx } - err = lp.buildListPartitionValueMap(ctx, tblInfo, schema, names, p) + err = lp.buildListPartitionValueMap(ctx, defs, schema, names, p) if err != nil { return err } return nil } -func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, +func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, + tblInfo *model.TableInfo, defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) error { pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) @@ -683,7 +706,7 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo valueMap: make(map[string]ListPartitionLocation), sorted: btree.NewG[*btreeListColumnItem](btreeDegree, lessBtreeListColumnItem), } - err := colPrune.buildPartitionValueMapAndSorted(p) + err := colPrune.buildPartitionValueMapAndSorted(p, defs) if err != nil { return err } @@ -696,12 +719,11 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo // buildListPartitionValueMap builds list partition value map. // The map is column value -> partition index. // colIdx is the column index in the list columns. -func (lp *ForListPruning) buildListPartitionValueMap(ctx sessionctx.Context, tblInfo *model.TableInfo, +func (lp *ForListPruning) buildListPartitionValueMap(ctx sessionctx.Context, defs []model.PartitionDefinition, schema *expression.Schema, names types.NameSlice, p *parser.Parser) error { - pi := tblInfo.GetPartitionInfo() lp.valueMap = map[int64]int{} lp.nullPartitionIdx = -1 - for partitionIdx, def := range pi.Definitions { + for partitionIdx, def := range defs { for _, vs := range def.InValues { expr, err := parseSimpleExprWithNames(p, ctx, vs[0], schema, names) if err != nil { @@ -770,26 +792,27 @@ func (lp *ForListPruning) locateListColumnsPartitionByRow(ctx sessionctx.Context // buildPartitionValueMapAndSorted builds list columns partition value map for the specified column. // It also builds list columns partition value btree for the specified column. // colIdx is the specified column index in the list columns. -func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted(p *parser.Parser) error { +func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted(p *parser.Parser, + defs []model.PartitionDefinition) error { l := len(lp.valueMap) if l != 0 { return nil } - return lp.buildListPartitionValueMapAndSorted(p) + return lp.buildListPartitionValueMapAndSorted(p, defs) } // RebuildPartitionValueMapAndSorted rebuilds list columns partition value map for the specified column. -func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Parser) error { +func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Parser, + defs []model.PartitionDefinition) error { lp.valueMap = make(map[string]ListPartitionLocation, len(lp.valueMap)) lp.sorted.Clear(false) - return lp.buildListPartitionValueMapAndSorted(p) + return lp.buildListPartitionValueMapAndSorted(p, defs) } -func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser) error { - pi := lp.tblInfo.GetPartitionInfo() +func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser, defs []model.PartitionDefinition) error { sc := lp.ctx.GetSessionVars().StmtCtx - for partitionIdx, def := range pi.Definitions { + for partitionIdx, def := range defs { for groupIdx, vs := range def.InValues { keyBytes, err := lp.genConstExprKey(lp.ctx, sc, vs[lp.colIdx], lp.schema, lp.names, p) if err != nil { @@ -935,8 +958,8 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, } // PartitionExpr returns the partition expression. -func (t *partitionedTable) PartitionExpr() (*PartitionExpr, error) { - return t.partitionExpr, nil +func (t *partitionedTable) PartitionExpr() *PartitionExpr { + return t.partitionExpr } func (t *partitionedTable) GetPartitionColumnNames() []model.CIStr { @@ -969,7 +992,7 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { } func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error { - defID, err := t.locatePartition(ctx, pi, r) + defID, err := t.locatePartition(ctx, r) if err != nil { return err } @@ -979,36 +1002,56 @@ func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi return nil } -// locatePartition returns the partition ID of the input record. -func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int64, error) { +// locatePartitionCommon returns the partition idx of the input record. +func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { var err error var idx int switch t.meta.Partition.Type { case model.PartitionTypeRange: if len(pi.Columns) == 0 { - idx, err = t.locateRangePartition(ctx, pi, r) + idx, err = t.locateRangePartition(ctx, partitionExpr, r) } else { - idx, err = t.locateRangeColumnPartition(ctx, pi, r) + idx, err = t.locateRangeColumnPartition(ctx, partitionExpr, r) } case model.PartitionTypeHash: + // Note that only LIST and RANGE supports REORGANIZE PARTITION + // TODO: Add support for ADD PARTITION and COALESCE PARTITION for HASH idx, err = t.locateHashPartition(ctx, pi, r) case model.PartitionTypeList: - idx, err = t.locateListPartition(ctx, pi, r) + idx, err = t.locateListPartition(ctx, partitionExpr, r) + } + if err != nil { + return 0, errors.Trace(err) } + return idx, nil +} + +func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { + pi := t.Meta().GetPartitionInfo() + idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, r) if err != nil { return 0, errors.Trace(err) } return pi.Definitions[idx].ID, nil } -func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { +func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { + pi := t.Meta().GetPartitionInfo() + idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, r) + if err != nil { + return 0, errors.Trace(err) + } + return pi.AddingDefinitions[idx].ID, nil +} + +func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { + upperBounds := partitionExpr.UpperBounds var lastError error - partitionExprs := t.partitionExpr.UpperBounds evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) defer t.evalBufferPool.Put(evalBuffer) - idx := sort.Search(len(partitionExprs), func(i int) bool { + idx := sort.Search(len(upperBounds), func(i int) bool { evalBuffer.SetDatums(r...) - ret, isNull, err := partitionExprs[i].EvalInt(ctx, evalBuffer.ToRow()) + ret, isNull, err := upperBounds[i].EvalInt(ctx, evalBuffer.ToRow()) if err != nil { lastError = err return true // Does not matter, will propagate the last error anyway. @@ -1023,11 +1066,11 @@ func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi if lastError != nil { return 0, errors.Trace(lastError) } - if idx >= len(partitionExprs) { + if idx >= len(upperBounds) { // The data does not belong to any of the partition returns `table has no partition for value %s`. var valueMsg string - if pi.Expr != "" { - e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta) + if t.meta.Partition.Expr != "" { + e, err := expression.ParseSimpleExprWithTableInfo(ctx, t.meta.Partition.Expr, t.meta) if err == nil { val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) if err == nil { @@ -1043,15 +1086,15 @@ func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi return idx, nil } -func (t *partitionedTable) locateListPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { - lp := t.partitionExpr.ForListPruning +func (t *partitionedTable) locateListPartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { + lp := partitionExpr.ForListPruning if len(lp.ColPrunes) == 0 { return lp.locateListPartitionByRow(ctx, r) } return lp.locateListColumnsPartitionByRow(ctx, r) } -func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { +func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { var ( ret int64 val int64 @@ -1074,7 +1117,7 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode ret = val } unsigned := mysql.HasUnsignedFlag(t.partitionExpr.Expr.GetType().GetFlag()) - ranges := t.partitionExpr.ForRangePruning + ranges := partitionExpr.ForRangePruning length := len(ranges.LessThan) pos := sort.Search(length, func(i int) bool { if isNull { @@ -1088,8 +1131,8 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode if pos < 0 || pos >= length { // The data does not belong to any of the partition returns `table has no partition for value %s`. var valueMsg string - if pi.Expr != "" { - e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta) + if t.meta.Partition.Expr != "" { + e, err := expression.ParseSimpleExprWithTableInfo(ctx, t.meta.Partition.Expr, t.meta) if err == nil { val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) if err == nil { @@ -1156,7 +1199,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { // GetPartitionByRow returns a Table, which is actually a Partition. func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { - pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1165,7 +1208,7 @@ func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.D // GetPartitionByRow returns a Table, which is actually a Partition. func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { - pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1181,8 +1224,7 @@ func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, op } func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r []types.Datum, partitionSelection map[int64]struct{}, opts []table.AddRecordOption) (recordID kv.Handle, err error) { - partitionInfo := t.meta.GetPartitionInfo() - pid, err := t.locatePartition(ctx, partitionInfo, r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1193,7 +1235,23 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r [] } } tbl := t.GetPartition(pid) - return tbl.AddRecord(ctx, r, opts...) + recordID, err = tbl.AddRecord(ctx, r, opts...) + if err != nil { + return + } + if _, ok := t.reorgPartitions[pid]; ok { + // Double write to the ongoing reorganized partition + pid, err = t.locateReorgPartition(ctx, r) + if err != nil { + return nil, errors.Trace(err) + } + tbl = t.GetPartition(pid) + recordID, err = tbl.AddRecord(ctx, r, opts...) + if err != nil { + return + } + } + return } // partitionTableWithGivenSets is used for this kind of grammar: partition (p0,p1) @@ -1230,14 +1288,29 @@ func (t *partitionTableWithGivenSets) GetAllPartitionIDs() []int64 { // RemoveRecord implements table.Table RemoveRecord interface. func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - partitionInfo := t.meta.GetPartitionInfo() - pid, err := t.locatePartition(ctx, partitionInfo, r) + pid, err := t.locatePartition(ctx, r) if err != nil { return errors.Trace(err) } tbl := t.GetPartition(pid) - return tbl.RemoveRecord(ctx, h, r) + err = tbl.RemoveRecord(ctx, h, r) + if err != nil { + return errors.Trace(err) + } + + if _, ok := t.reorgPartitions[pid]; ok { + pid, err = t.locateReorgPartition(ctx, r) + if err != nil { + return errors.Trace(err) + } + tbl = t.GetPartition(pid) + err = tbl.RemoveRecord(ctx, h, r) + if err != nil { + return errors.Trace(err) + } + } + return nil } func (t *partitionedTable) GetAllPartitionIDs() []int64 { @@ -1260,12 +1333,11 @@ func (t *partitionTableWithGivenSets) UpdateRecord(ctx context.Context, sctx ses } func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}) error { - partitionInfo := t.meta.GetPartitionInfo() - from, err := t.locatePartition(ctx, partitionInfo, currData) + from, err := t.locatePartition(ctx, currData) if err != nil { return errors.Trace(err) } - to, err := t.locatePartition(ctx, partitionInfo, newData) + to, err := t.locatePartition(ctx, newData) if err != nil { return errors.Trace(err) } @@ -1292,16 +1364,96 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, // So this special order is chosen: add record first, errors such as // 'Key Already Exists' will generally happen during step1, errors are // unlikely to happen in step2. + // TODO: check what happens with foreign keys in step 2? err = t.GetPartition(from).RemoveRecord(ctx, h, currData) if err != nil { + // TODO, test this!! I assume that we need to clean this up, + // since there are non-atomic changes in the transaction buffer + // which if committed will cause inconsistencies? + // What to do if something during the cleanup fails? Can we block + // the transaction from ever being committed? logutil.BgLogger().Error("update partition record fails", zap.String("message", "new record inserted while old record is not removed"), zap.Error(err)) return errors.Trace(err) } + // TODO: Test if the update is in different partitions before reorg, + // but is now in the same during the reorg? And vice-versa... + // What if the change is in the same reorged partition?!? + newTo, newFrom := int64(-1), int64(-1) + if _, ok := t.reorgPartitions[to]; ok { + newTo, err = t.locateReorgPartition(ctx, newData) + // There might be valid cases when errors should be accepted? + if err != nil { + return errors.Trace(err) + } + } + if _, ok := t.reorgPartitions[from]; ok { + newFrom, err = t.locateReorgPartition(ctx, currData) + // There might be valid cases when errors should be accepted? + if err != nil { + return errors.Trace(err) + } + } + if newTo == newFrom && newTo != -1 { + tbl := t.GetPartition(newTo) + return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + } + if newTo != -1 { + tbl := t.GetPartition(newTo) + _, err = tbl.AddRecord(ctx, newData) + if err != nil { + return errors.Trace(err) + } + } + if newFrom != -1 { + tbl := t.GetPartition(newFrom) + err = tbl.RemoveRecord(ctx, h, currData) + // How to handle error, which can happen when the data is not yet backfilled + // TODO: Create a test for this!!! + if err != nil { + return errors.Trace(err) + } + } return nil } - tbl := t.GetPartition(to) - return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + if err != nil { + return errors.Trace(err) + } + if _, ok := t.reorgPartitions[to]; ok { + // Even if to == from, in the reorganized partitions they may differ + // like in case of a split + newTo, err := t.locateReorgPartition(ctx, newData) + if err != nil { + return errors.Trace(err) + } + newFrom, err := t.locateReorgPartition(ctx, currData) + if err != nil { + return errors.Trace(err) + } + if newTo == newFrom { + tbl = t.GetPartition(newTo) + err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + if err != nil { + return errors.Trace(err) + } + return nil + } + tbl = t.GetPartition(newTo) + _, err = tbl.AddRecord(ctx, newData) + // TODO: Could there be a case where a duplicate unique key could happen here? + if err != nil { + return errors.Trace(err) + } + tbl = t.GetPartition(newFrom) + err = tbl.RemoveRecord(ctx, h, currData) + // How to handle error, which can happen when the data is not yet backfilled + // TODO: Create a test for this!!! + if err != nil { + return errors.Trace(err) + } + } + return nil } // FindPartitionByName finds partition in table meta by name. diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index cc8dd90a44737..0bac493aa7f35 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -273,10 +273,9 @@ func TestGeneratePartitionExpr(t *testing.T) { tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } - pe, err := tbl.(partitionExpr).PartitionExpr() - require.NoError(t, err) + pe := tbl.(partitionExpr).PartitionExpr() upperBounds := []string{ "lt(t1.id, 4)", From 1919a582d0c66e98a728bad4a8e4ed22746a4b6f Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 17 Oct 2022 16:54:23 +0200 Subject: [PATCH 2/5] formatting --- table/tables/partition.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/table/tables/partition.go b/table/tables/partition.go index 372a39d847449..69f27916f1d79 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -83,7 +83,7 @@ type partitionedTable struct { evalBufferTypes []*types.FieldType evalBufferPool sync.Pool // Only used during Reorganize partition - reorgPartitions map[int64]interface{} + reorgPartitions map[int64]interface{} reorgPartitionExpr *PartitionExpr } @@ -1295,22 +1295,22 @@ func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r [ tbl := t.GetPartition(pid) err = tbl.RemoveRecord(ctx, h, r) - if err != nil { - return errors.Trace(err) - } - - if _, ok := t.reorgPartitions[pid]; ok { - pid, err = t.locateReorgPartition(ctx, r) - if err != nil { - return errors.Trace(err) - } - tbl = t.GetPartition(pid) - err = tbl.RemoveRecord(ctx, h, r) - if err != nil { - return errors.Trace(err) - } - } - return nil + if err != nil { + return errors.Trace(err) + } + + if _, ok := t.reorgPartitions[pid]; ok { + pid, err = t.locateReorgPartition(ctx, r) + if err != nil { + return errors.Trace(err) + } + tbl = t.GetPartition(pid) + err = tbl.RemoveRecord(ctx, h, r) + if err != nil { + return errors.Trace(err) + } + } + return nil } func (t *partitionedTable) GetAllPartitionIDs() []int64 { From 5632e4a3051eb2d219fede4265d7da3decd6d21d Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 23 Dec 2022 09:13:27 +0000 Subject: [PATCH 3/5] post merge fix, removed test --- ddl/db_partition_test.go | 77 ---------------------------------------- 1 file changed, 77 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index bb486b0c0e571..c4fff4a275b7a 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4558,83 +4558,6 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) { tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'") } -func TestAlterModifyColumnOnPartitionedTableFail(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - schemaName := "modColPartFail" - tk.MustExec("create database " + schemaName) - tk.MustExec("use " + schemaName) - tk.MustExec(`create table t (a int unsigned, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20), partition pMax values less than (MAXVALUE))`) - tk.MustExec(`insert into t values (7, "07"), (8, "08"),(23,"23"),(34,"34💥"),(46,"46"),(57,"57")`) - tk.MustGetErrCode(`alter table t modify a varchar(255)`, errno.ErrUnsupportedDDLOperation) - tk.MustGetErrCode(`alter table t modify a float`, mysql.ErrFieldTypeNotAllowedAsPartitionField) - tk.MustExec(`drop table t`) - tk.MustExec(`create table t (b int unsigned, a varchar(255), key (b)) partition by range columns (a) (partition p0 values less than (""), partition p1 values less than ("11111"), partition pMax values less than (MAXVALUE))`) - tk.MustExec(`insert into t values (7, "07"), (8, "08"),(23,"23"),(34,"34 💥💥Longer than 11111"),(46,"46"),(57,"57")`) - tk.MustExec(`alter table t modify a varchar(50)`) - tk.MustGetErrCode(`alter table t modify a float`, mysql.ErrFieldTypeNotAllowedAsPartitionField) - tk.MustGetErrCode(`alter table t modify a int`, errno.ErrUnsupportedDDLOperation) - tk.MustContainErrMsg(`alter table t modify a varchar(4)`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type") - tk.MustGetErrCode(`alter table t modify a varchar(5)`, errno.WarnDataTruncated) - tk.MustExec(`SET SQL_MODE = ''`) - tk.MustExec(`alter table t modify a varchar(5)`) - // fix https://github.com/pingcap/tidb/issues/38669 and update this - //tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1265 Data truncated for column 'a', value is '34 💥💥Longer than 11111'")) - tk.MustExec(`SET SQL_MODE = DEFAULT`) - tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ - "23 23", - "34 34 💥💥", - "46 46", - "57 57", - "7 07", - "8 08")) - tStr := "" + - "CREATE TABLE `t` (\n" + - " `b` int(10) unsigned DEFAULT NULL,\n" + - " `a` varchar(5) DEFAULT NULL,\n" + - " KEY `b` (`b`)\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + - "PARTITION BY RANGE COLUMNS(`a`)\n" + - "(PARTITION `p0` VALUES LESS THAN (''),\n" + - " PARTITION `p1` VALUES LESS THAN ('11111'),\n" + - " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))" - tk.MustQuery(`show create table t`).Check(testkit.Rows("t " + tStr)) - tk.MustExec(`drop table t`) - tk.MustExec(tStr) - tk.MustExec(`drop table t`) - tk.MustExec("create table t (a int, b varchar(255), key (b)) partition by range (a) (partition `p-300` values less than (-300), partition p0 values less than (0), partition p300 values less than (300))") - tk.MustExec(`insert into t values (-400, "-400"), (-100, "-100"), (0, "0"), (100, "100"), (290, "290")`) - tk.MustContainErrMsg(`alter table t modify a int unsigned`, "[ddl:8200]Unsupported modify column, decreasing length of int may result in truncation and change of partition") - tk.MustContainErrMsg(`alter table t modify a tinyint`, "[ddl:8200]Unsupported modify column, decreasing length of int may result in truncation and change of partition") - tk.MustExec(`set sql_mode = ''`) - tk.MustContainErrMsg(`alter table t modify a tinyint`, "[ddl:8200]Unsupported modify column, decreasing length of int may result in truncation and change of partition") - tk.MustQuery("select * from t partition (`p-300`)").Sort().Check(testkit.Rows("-400 -400")) - tk.MustExec(`set sql_mode = default`) - tk.MustContainErrMsg(`alter table t modify a smallint`, "[ddl:8200]Unsupported modify column, decreasing length of int may result in truncation and change of partition") - tk.MustExec(`alter table t modify a bigint`) - tk.MustExec(`drop table t`) - tk.MustExec("create table t (a int, b varchar(255), key (b)) partition by range columns (a) (partition `p-300` values less than (-300), partition p0 values less than (0), partition p300 values less than (300))") - tk.MustExec(`insert into t values (-400, "-400"), (-100, "-100"), (0, "0"), (100, "100"), (290, "290")`) - tk.MustContainErrMsg(`alter table t modify a int unsigned`, "[ddl:8200]Unsupported modify column: can't change the partitioning column, since it would require reorganize all partitions") - tk.MustContainErrMsg(`alter table t modify a tinyint`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type") - tk.MustExec(`set sql_mode = ''`) - tk.MustContainErrMsg(`alter table t modify a tinyint`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type") - tk.MustQuery("select * from t partition (`p-300`)").Sort().Check(testkit.Rows("-400 -400")) - tk.MustExec(`set sql_mode = default`) - // OK to decrease, since with RANGE COLUMNS, it will check the partition definition values against the new type - tk.MustExec(`alter table t modify a smallint`) - tk.MustExec(`alter table t modify a bigint`) - - tk.MustExec(`drop table t`) - - tk.MustExec(`create table t (a int, b varchar(255), key (b)) partition by list columns (b) (partition p1 values in ("1", "ab", "12345"), partition p2 values in ("2", "abc", "999999"))`) - tk.MustExec(`insert into t values (1, "1"), (2, "2"), (999999, "999999")`) - tk.MustContainErrMsg(`alter table t modify column b varchar(5)`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type") - tk.MustExec(`set sql_mode = ''`) - tk.MustContainErrMsg(`alter table t modify column b varchar(5)`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type") - tk.MustExec(`set sql_mode = default`) -} - func TestReorgPartitionConcurrent(t *testing.T) { t.Skip("Needs PR 38460 as well") store := testkit.CreateMockStore(t) From 47020331502800d39bd32d0d87f1f51ea8b74acb Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 23 Dec 2022 16:47:45 +0000 Subject: [PATCH 4/5] Merge fix --- executor/builder.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d4270397eecd0..f60a7a78f5a52 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3527,10 +3527,14 @@ func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []i keyColOffsets[i] = offset } - pe, err := pt.(interface { - PartitionExpr() (*tables.PartitionExpr, error) - }).PartitionExpr() - if err != nil { + t, ok := pt.(interface { + PartitionExpr() *tables.PartitionExpr + }) + if !ok { + return nil + } + pe := t.PartitionExpr() + if pe == nil { return nil } From e0308d7a437e011c622c0ec39d98b3e6d9d3a29f Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 30 Dec 2022 10:09:52 +0000 Subject: [PATCH 5/5] post merge fixes --- ddl/db_partition_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 13f435e958b94..8b2ea57a4ccdb 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4579,14 +4579,14 @@ func TestReorgPartitionConcurrent(t *testing.T) { injected := false hook.OnJobRunBeforeExported = func(job *model.Job) { - if /* TODO: uncomment!! job.Type == model.ActionReorganizePartition && */ job.SchemaState == model.StateWriteReorganization && !injected { + if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization && !injected { injected = true <-wait <-wait } } alterErr := make(chan error, 1) - go backgroundExec(store /* TODO: uncomment!! schemaName, */, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) + go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) wait <- true tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) wait <- true