From 9d4da4f3fb055ac8a22ceeda2af4406d4be88f3a Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 10 Dec 2021 10:49:57 +0100 Subject: [PATCH 01/10] *: query failed after add index / timestamp out-of-range (#28424) (#29323) --- planner/core/integration_test.go | 44 +++++++++++++++++++++++++++++--- table/column.go | 14 ++++++++++ types/datum.go | 3 ++- util/ranger/ranger.go | 3 +++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index e58579c5bcb58..c3e09b82c2409 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -4604,10 +4604,48 @@ func (s *testIntegrationSuite) TestIssue27242(c *C) { tk.MustExec("use test") tk.MustExec("drop table if exists UK_MU16407") tk.MustExec("CREATE TABLE UK_MU16407 (COL3 timestamp NULL DEFAULT NULL, UNIQUE KEY U3(COL3));") + defer tk.MustExec("DROP TABLE UK_MU16407") tk.MustExec(`insert into UK_MU16407 values("1985-08-31 18:03:27");`) - err := tk.ExecToErr(`SELECT COL3 FROM UK_MU16407 WHERE COL3>_utf8mb4'2039-1-19 3:14:40';`) - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, ".*Incorrect timestamp value.*") + tk.MustExec(`SELECT COL3 FROM UK_MU16407 WHERE COL3>_utf8mb4'2039-1-19 3:14:40';`) +} + +func verifyTimestampOutOfRange(tk *testkit.TestKit) { + tk.MustQuery(`select * from t28424 where t != "2038-1-19 3:14:08"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) + tk.MustQuery(`select * from t28424 where t < "2038-1-19 3:14:08"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) + tk.MustQuery(`select * from t28424 where t <= "2038-1-19 3:14:08"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) + tk.MustQuery(`select * from t28424 where t >= "2038-1-19 3:14:08"`).Check(testkit.Rows()) + tk.MustQuery(`select * from t28424 where t > "2038-1-19 3:14:08"`).Check(testkit.Rows()) + tk.MustQuery(`select * from t28424 where t != "1970-1-1 0:0:0"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) + tk.MustQuery(`select * from t28424 where t < "1970-1-1 0:0:0"`).Check(testkit.Rows()) + tk.MustQuery(`select * from t28424 where t <= "1970-1-1 0:0:0"`).Check(testkit.Rows()) + tk.MustQuery(`select * from t28424 where t >= "1970-1-1 0:0:0"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) + tk.MustQuery(`select * from t28424 where t > "1970-1-1 0:0:0"`).Sort().Check(testkit.Rows("1970-01-01 00:00:01]\n[2038-01-19 03:14:07")) +} + +func (s *testIntegrationSuite) TestIssue28424(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t28424, dt28242") + + tk.MustExec(`set time_zone='+00:00'`) + tk.MustExec(`drop table if exists t28424,dt28424`) + tk.MustExec(`create table t28424 (t timestamp)`) + defer tk.MustExec("DROP TABLE t28424") + tk.MustExec(`insert into t28424 values ("2038-01-19 03:14:07"), ("1970-01-01 00:00:01")`) + + verifyTimestampOutOfRange(tk) + tk.MustExec(`alter table t28424 add unique index (t)`) + verifyTimestampOutOfRange(tk) + tk.MustExec(`create table dt28424 (dt datetime)`) + defer tk.MustExec("DROP TABLE dt28424") + tk.MustExec(`insert into dt28424 values ("2038-01-19 03:14:07"), ("1970-01-01 00:00:01")`) + tk.MustExec(`insert into dt28424 values ("1969-12-31 23:59:59"), ("1970-01-01 00:00:00"), ("2038-03-19 03:14:08")`) + tk.MustQuery(`select * from t28424 right join dt28424 on t28424.t = dt28424.dt`).Sort().Check(testkit.Rows( + "1970-01-01 00:00:01 1970-01-01 00:00:01]\n" + + "[2038-01-19 03:14:07 2038-01-19 03:14:07]\n" + + "[ 1969-12-31 23:59:59]\n" + + "[ 1970-01-01 00:00:00]\n" + + "[ 2038-03-19 03:14:08")) } func (s *testIntegrationSerialSuite) TestTemporaryTableForCte(c *C) { diff --git a/table/column.go b/table/column.go index 445a169a82b59..0225a88556c0b 100644 --- a/table/column.go +++ b/table/column.go @@ -192,6 +192,12 @@ func handleWrongCharsetValue(ctx sessionctx.Context, col *model.ColumnInfo, str return err } +// handleZeroDatetime handles Timestamp/Datetime/Date zero date and invalid dates. +// Currently only called from CastValue. +// returns: +// value (possibly adjusted) +// boolean; true if break error/warning handling in CastValue and return what was returned from this +// error func handleZeroDatetime(ctx sessionctx.Context, col *model.ColumnInfo, casted types.Datum, str string, tmIsInvalid bool) (types.Datum, bool, error) { sc := ctx.GetSessionVars().StmtCtx tm := casted.GetMysqlTime() @@ -242,6 +248,14 @@ func handleZeroDatetime(ctx sessionctx.Context, col *model.ColumnInfo, casted ty sc.AppendWarning(innerErr) } return types.NewDatum(zeroV), true, nil + } else if tmIsInvalid && col.Tp == mysql.TypeTimestamp { + // Prevent from being stored! Invalid timestamp! + if mode.HasStrictMode() { + return types.NewDatum(zeroV), true, types.ErrWrongValue.GenWithStackByArgs(zeroT, str) + } + // no strict mode, truncate to 0000-00-00 00:00:00 + sc.AppendWarning(types.ErrWrongValue.GenWithStackByArgs(zeroT, str)) + return types.NewDatum(zeroV), true, nil } else if tm.IsZero() || tm.InvalidZero() { if tm.IsZero() { // Don't care NoZeroDate mode if time val is invalid. diff --git a/types/datum.go b/types/datum.go index f81c3c7a94d14..3e9990d1070dc 100644 --- a/types/datum.go +++ b/types/datum.go @@ -1273,7 +1273,8 @@ func (d *Datum) convertToMysqlTimestamp(sc *stmtctx.StatementContext, target *Fi case KindMysqlTime: t, err = d.GetMysqlTime().Convert(sc, target.Tp) if err != nil { - ret.SetMysqlTime(ZeroTimestamp) + // t might be an invalid Timestamp, but should still be comparable, since same representation (KindMysqlTime) + ret.SetMysqlTime(t) return ret, errors.Trace(ErrWrongValue.GenWithStackByArgs(TimestampStr, t.String())) } t, err = t.RoundFrac(sc, fsp) diff --git a/util/ranger/ranger.go b/util/ranger/ranger.go index d95517fdcc50e..f365a5fa0dd5f 100644 --- a/util/ranger/ranger.go +++ b/util/ranger/ranger.go @@ -108,6 +108,9 @@ func convertPoint(sctx sessionctx.Context, point *point, tp *types.FieldType) (* // Ignore the types.ErrOverflow when we convert TypeNewDecimal values. // A trimmed valid boundary point value would be returned then. Accordingly, the `excl` of the point // would be adjusted. Impossible ranges would be skipped by the `validInterval` call later. + } else if point.value.Kind() == types.KindMysqlTime && tp.Tp == mysql.TypeTimestamp && terror.ErrorEqual(err, types.ErrWrongValue) { + // See issue #28424: query failed after add index + // Ignore conversion from Date[Time] to Timestamp since it must be either out of range or impossible date, which will not match a point select } else if tp.Tp == mysql.TypeEnum && terror.ErrorEqual(err, types.ErrTruncated) { // Ignore the types.ErrorTruncated when we convert TypeEnum values. // We should cover Enum upper overflow, and convert to the biggest value. From 512373e6735cc9b274521d2cf98dbaf0653965bb Mon Sep 17 00:00:00 2001 From: Yifan Xu <30385241+xuyifangreeneyes@users.noreply.github.com> Date: Fri, 10 Dec 2021 18:13:57 +0800 Subject: [PATCH 02/10] planner: implement collecting predicate columns from logical plan (#29878) --- expression/util.go | 23 ++ planner/core/collect_column_stats_usage.go | 252 ++++++++++++++++++ .../core/collect_column_stats_usage_test.go | 222 +++++++++++++++ planner/core/logical_plan_builder.go | 2 +- planner/core/logical_plans.go | 3 + 5 files changed, 501 insertions(+), 1 deletion(-) create mode 100644 planner/core/collect_column_stats_usage.go create mode 100644 planner/core/collect_column_stats_usage_test.go diff --git a/expression/util.go b/expression/util.go index 7b34ef442067b..d7b92329d51f6 100644 --- a/expression/util.go +++ b/expression/util.go @@ -166,6 +166,29 @@ func extractColumns(result []*Column, expr Expression, filter func(*Column) bool return result } +// ExtractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`. +func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column { + switch v := expr.(type) { + case *Column: + result = append(result, v) + case *CorrelatedColumn: + result = append(result, &v.Column) + case *ScalarFunction: + for _, arg := range v.GetArgs() { + result = ExtractColumnsAndCorColumns(result, arg) + } + } + return result +} + +// ExtractColumnsAndCorColumnsFromExpressions extracts columns and correlated columns from expressions and append them to `result`. +func ExtractColumnsAndCorColumnsFromExpressions(result []*Column, list []Expression) []*Column { + for _, expr := range list { + result = ExtractColumnsAndCorColumns(result, expr) + } + return result +} + // ExtractColumnSet extracts the different values of `UniqueId` for columns in expressions. func ExtractColumnSet(exprs []Expression) *intsets.Sparse { set := &intsets.Sparse{} diff --git a/planner/core/collect_column_stats_usage.go b/planner/core/collect_column_stats_usage.go new file mode 100644 index 0000000000000..6396b1ddad34f --- /dev/null +++ b/planner/core/collect_column_stats_usage.go @@ -0,0 +1,252 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/parser/model" +) + +// predicateColumnCollector collects predicate columns from logical plan. Predicate columns are the columns whose statistics +// are utilized when making query plans, which usually occur in where conditions, join conditions and so on. +type predicateColumnCollector struct { + // colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column. + colMap map[int64]map[model.TableColumnID]struct{} + // predicateCols records predicate columns. + predicateCols map[model.TableColumnID]struct{} + // cols is used to store columns collected from expressions and saves some allocation. + cols []*expression.Column +} + +func newPredicateColumnCollector() *predicateColumnCollector { + return &predicateColumnCollector{ + colMap: make(map[int64]map[model.TableColumnID]struct{}), + predicateCols: make(map[model.TableColumnID]struct{}), + // Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning. + cols: make([]*expression.Column, 0, 8), + } +} + +func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) { + tblColIDs, ok := c.colMap[col.UniqueID] + if !ok { + // It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs. + return + } + for tblColID := range tblColIDs { + c.predicateCols[tblColID] = struct{}{} + } +} + +func (c *predicateColumnCollector) addPredicateColumnsFromExpression(expr expression.Expression) { + cols := expression.ExtractColumnsAndCorColumns(c.cols[:0], expr) + for _, col := range cols { + c.addPredicateColumn(col) + } +} + +func (c *predicateColumnCollector) addPredicateColumnsFromExpressions(list []expression.Expression) { + cols := expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list) + for _, col := range cols { + c.addPredicateColumn(col) + } +} + +func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) { + if _, ok := c.colMap[col.UniqueID]; !ok { + c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{} + } + for _, relatedCol := range relatedCols { + tblColIDs, ok := c.colMap[relatedCol.UniqueID] + if !ok { + // It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs. + continue + } + for tblColID := range tblColIDs { + c.colMap[col.UniqueID][tblColID] = struct{}{} + } + } +} + +func (c *predicateColumnCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) { + c.updateColMap(col, expression.ExtractColumnsAndCorColumns(c.cols[:0], expr)) +} + +func (c *predicateColumnCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) { + c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list)) +} + +func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + tblID := ds.TableInfo().ID + for _, col := range ds.Schema().Columns { + tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID} + c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{tblColID: {}} + } + // We should use `pushedDownConds` here. `allConds` is used for partition pruning, which doesn't need stats. + c.addPredicateColumnsFromExpressions(ds.pushedDownConds) +} + +func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + // The only schema change is merging two schemas so there is no new column. + // Assume statistics of all the columns in EqualConditions/LeftConditions/RightConditions/OtherConditions are needed. + exprs := make([]expression.Expression, 0, len(p.EqualConditions)+len(p.LeftConditions)+len(p.RightConditions)+len(p.OtherConditions)) + for _, cond := range p.EqualConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.LeftConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.RightConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.OtherConditions { + exprs = append(exprs, cond) + } + c.addPredicateColumnsFromExpressions(exprs) +} + +func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + // statistics of the ith column of UnionAll come from statistics of the ith column of each child. + schemas := make([]*expression.Schema, 0, len(p.Children())) + relatedCols := make([]*expression.Column, 0, len(p.Children())) + for _, child := range p.Children() { + schemas = append(schemas, child.Schema()) + } + for i, col := range p.Schema().Columns { + relatedCols = relatedCols[:0] + for j := range p.Children() { + relatedCols = append(relatedCols, schemas[j].Columns[i]) + } + c.updateColMap(col, relatedCols) + } +} + +func (c *predicateColumnCollector) collectFromPlan(lp LogicalPlan) { + for _, child := range lp.Children() { + c.collectFromPlan(child) + } + switch x := lp.(type) { + case *DataSource: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalIndexScan: + x.Source.updateColMapAndAddPredicateColumns(c) + // TODO: Is it redundant to add predicate columns from LogicalIndexScan.AccessConds? Is LogicalIndexScan.AccessConds a subset of LogicalIndexScan.Source.pushedDownConds. + c.addPredicateColumnsFromExpressions(x.AccessConds) + case *LogicalTableScan: + x.Source.updateColMapAndAddPredicateColumns(c) + // TODO: Is it redundant to add predicate columns from LogicalTableScan.AccessConds? Is LogicalTableScan.AccessConds a subset of LogicalTableScan.Source.pushedDownConds. + c.addPredicateColumnsFromExpressions(x.AccessConds) + case *TiKVSingleGather: + // TODO: Is it redundant? + x.Source.updateColMapAndAddPredicateColumns(c) + case *LogicalProjection: + // Schema change from children to self. + schema := x.Schema() + for i, expr := range x.Exprs { + c.updateColMapFromExpression(schema.Columns[i], expr) + } + case *LogicalSelection: + // Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still + // regard statistics of the columns in the conditions as needed. + c.addPredicateColumnsFromExpressions(x.Conditions) + case *LogicalAggregation: + // Just assume statistics of all the columns in GroupByItems are needed. + c.addPredicateColumnsFromExpressions(x.GroupByItems) + // Schema change from children to self. + schema := x.Schema() + for i, aggFunc := range x.AggFuncs { + c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args) + } + case *LogicalWindow: + // Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window. + // It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently? + for _, item := range x.PartitionBy { + c.addPredicateColumn(item.Col) + } + // Schema change from children to self. + windowColumns := x.GetWindowResultColumns() + for i, col := range windowColumns { + c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args) + } + case *LogicalJoin: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalApply: + x.updateColMapAndAddPredicateColumns(c) + // Assume statistics of correlated columns are needed. + // Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0], + // correlated columns must have existed in predicateColumnCollector.colMap. + for _, corCols := range x.CorCols { + c.addPredicateColumn(&corCols.Column) + } + case *LogicalSort: + // Assume statistics of all the columns in ByItems are needed. + for _, item := range x.ByItems { + c.addPredicateColumnsFromExpression(item.Expr) + } + case *LogicalTopN: + // Assume statistics of all the columns in ByItems are needed. + for _, item := range x.ByItems { + c.addPredicateColumnsFromExpression(item.Expr) + } + case *LogicalUnionAll: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalPartitionUnionAll: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalCTE: + // Visit seedPartLogicalPlan and recursivePartLogicalPlan first. + c.collectFromPlan(x.cte.seedPartLogicalPlan) + if x.cte.recursivePartLogicalPlan != nil { + c.collectFromPlan(x.cte.recursivePartLogicalPlan) + } + // Schema change from seedPlan/recursivePlan to self. + columns := x.Schema().Columns + seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns + var recursiveColumns []*expression.Column + if x.cte.recursivePartLogicalPlan != nil { + recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns + } + relatedCols := make([]*expression.Column, 0, 2) + for i, col := range columns { + relatedCols = append(relatedCols[:0], seedColumns[i]) + if recursiveColumns != nil { + relatedCols = append(relatedCols, recursiveColumns[i]) + } + c.updateColMap(col, relatedCols) + } + // If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case + // statistics of all the columns are needed. + if x.cte.IsDistinct { + for _, col := range columns { + c.addPredicateColumn(col) + } + } + case *LogicalCTETable: + // Schema change from seedPlan to self. + for i, col := range x.Schema().Columns { + c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]}) + } + } +} + +// CollectPredicateColumnsForTest collects predicate columns from logical plan. It is only for test. +func CollectPredicateColumnsForTest(lp LogicalPlan) []model.TableColumnID { + collector := newPredicateColumnCollector() + collector.collectFromPlan(lp) + tblColIDs := make([]model.TableColumnID, 0, len(collector.predicateCols)) + for tblColID := range collector.predicateCols { + tblColIDs = append(tblColIDs, tblColID) + } + return tblColIDs +} diff --git a/planner/core/collect_column_stats_usage_test.go b/planner/core/collect_column_stats_usage_test.go new file mode 100644 index 0000000000000..912765549fa7b --- /dev/null +++ b/planner/core/collect_column_stats_usage_test.go @@ -0,0 +1,222 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/tidb/parser/model" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/hint" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" +) + +func TestCollectPredicateColumns(t *testing.T) { + t.Parallel() + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("set @@session.tidb_partition_prune_mode = 'static'") + tk.MustExec("create table t1(a int, b int, c int)") + tk.MustExec("create table t2(a int, b int, c int)") + tk.MustExec("create table t3(a int, b int, c int) partition by range(a) (partition p0 values less than (10), partition p1 values less than (20), partition p2 values less than maxvalue)") + + tests := []struct { + sql string + res []string + }{ + { + // DataSource + sql: "select * from t1 where a > 2", + res: []string{"t1.a"}, + }, + { + // DataSource + sql: "select * from t1 where b in (2, 5) or c = 5", + res: []string{"t1.b", "t1.c"}, + }, + { + // LogicalProjection + sql: "select * from (select a + b as ab, c from t1) as tmp where ab > 4", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalAggregation + sql: "select b, count(*) from t1 group by b", + res: []string{"t1.b"}, + }, + { + // LogicalAggregation + sql: "select b, sum(a) from t1 group by b having sum(a) > 3", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalAggregation + sql: "select count(*), sum(a), sum(c) from t1", + res: []string{}, + }, + { + // LogicalAggregation + sql: "(select a, b from t1) union (select a, c from t2)", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalWindow + sql: "select avg(b) over(partition by a) from t1", + res: []string{"t1.a"}, + }, + { + // LogicalWindow + sql: "select * from (select avg(b) over(partition by a) as w from t1) as tmp where w > 4", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalWindow + sql: "select row_number() over(partition by a order by c) from t1", + res: []string{"t1.a"}, + }, + { + // LogicalJoin + sql: "select * from t1, t2 where t1.a = t2.a", + res: []string{"t1.a", "t2.a"}, + }, + { + // LogicalJoin + sql: "select * from t1 as x join t2 as y on x.b + y.c > 2", + res: []string{"t1.b", "t2.c"}, + }, + { + // LogicalJoin + sql: "select * from t1 as x join t2 as y on x.a = y.a and x.b < 3 and y.c > 2", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalJoin + sql: "select x.b, y.c, sum(x.c), sum(y.b) from t1 as x join t2 as y on x.a = y.a group by x.b, y.c order by x.b", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > all(select b from t2 where t2.c > 2)", + res: []string{"t1.b", "t2.b", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > (select count(b) from t2 where t2.c > t1.a)", + res: []string{"t1.a", "t1.b", "t2.b", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > (select count(*) from t2 where t2.c > t1.a)", + res: []string{"t1.a", "t1.b", "t2.c"}, + }, + { + // LogicalSort + sql: "select * from t1 order by c", + res: []string{"t1.c"}, + }, + { + // LogicalTopN + sql: "select * from t1 order by a + b limit 10", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalUnionAll + sql: "select * from ((select a, b from t1) union all (select a, c from t2)) as tmp where tmp.b > 2", + res: []string{"t1.b", "t2.c"}, + }, + { + // LogicalPartitionUnionAll + sql: "select * from t3 where a < 15 and b > 1", + res: []string{"t3.a", "t3.b"}, + }, + { + // LogicalCTE + sql: "with cte(x, y) as (select a + 1, b from t1 where b > 1) select * from cte where x > 3", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select c, 1 from t1 union all select x + 1, y from cte where x < 5) select * from cte", + res: []string{"t1.c"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select 1, c from t1 union all select x + 1, y from cte where x < 5) select * from cte where y > 1", + res: []string{"t1.c"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select a, b from t1 union select x + 1, y from cte where x < 5) select * from cte", + res: []string{"t1.a", "t1.b"}, + }, + } + + ctx := context.Background() + sctx := tk.Session() + is := dom.InfoSchema() + getColName := func(tblColID model.TableColumnID) (string, bool) { + tbl, ok := is.TableByID(tblColID.TableID) + if !ok { + return "", false + } + tblInfo := tbl.Meta() + for _, col := range tblInfo.Columns { + if tblColID.ColumnID == col.ID { + return tblInfo.Name.L + "." + col.Name.L, true + } + } + return "", false + } + checkPredicateColumns := func(lp plannercore.LogicalPlan, expected []string, comment string) { + tblColIDs := plannercore.CollectPredicateColumnsForTest(lp) + cols := make([]string, 0, len(tblColIDs)) + for _, tblColID := range tblColIDs { + col, ok := getColName(tblColID) + require.True(t, ok, comment) + cols = append(cols, col) + } + require.ElementsMatch(t, expected, cols, comment) + } + + for _, tt := range tests { + comment := fmt.Sprintf("for %s", tt.sql) + logutil.BgLogger().Info(comment) + stmts, err := tk.Session().Parse(ctx, tt.sql) + require.NoError(t, err, comment) + stmt := stmts[0] + err = plannercore.Preprocess(sctx, stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is})) + require.NoError(t, err, comment) + builder, _ := plannercore.NewPlanBuilder().Init(sctx, is, &hint.BlockHintProcessor{}) + p, err := builder.Build(ctx, stmt) + require.NoError(t, err, comment) + lp, ok := p.(plannercore.LogicalPlan) + require.True(t, ok, comment) + // We check predicate columns twice, before and after logical optimization. Some logical plan patterns may occur before + // logical optimization while others may occur after logical optimization. + // logutil.BgLogger().Info("before logical opt", zap.String("lp", plannercore.ToString(lp))) + checkPredicateColumns(lp, tt.res, comment) + lp, err = plannercore.LogicalOptimize(ctx, builder.GetOptFlag(), lp) + require.NoError(t, err, comment) + // logutil.BgLogger().Info("after logical opt", zap.String("lp", plannercore.ToString(lp))) + checkPredicateColumns(lp, tt.res, comment) + } +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index b40db2fe4ef5d..c3b1239d4ffac 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3871,7 +3871,7 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName } cte.recursiveRef = true - p := LogicalCTETable{name: cte.def.Name.String(), idForStorage: cte.storageID, seedStat: cte.seedStat}.Init(b.ctx, b.getSelectOffset()) + p := LogicalCTETable{name: cte.def.Name.String(), idForStorage: cte.storageID, seedStat: cte.seedStat, seedSchema: cte.seedLP.Schema()}.Init(b.ctx, b.getSelectOffset()) p.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars())) p.SetOutputNames(cte.seedLP.OutputNames()) return p, nil diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 3d5bbeeaa9216..212f10d65346a 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1309,6 +1309,9 @@ type LogicalCTETable struct { seedStat *property.StatsInfo name string idForStorage int + + // seedSchema is only used in predicateColumnCollector to get column mapping + seedSchema *expression.Schema } // ExtractCorrelatedCols implements LogicalPlan interface. From c08f7fc65c8831682c078322a41769ae568e9155 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 10 Dec 2021 21:53:58 +0800 Subject: [PATCH 03/10] *: show PK name when decoding the clustered index row key (#30623) --- ddl/ddl_api.go | 6 ++-- executor/infoschema_reader.go | 2 +- executor/show.go | 2 +- expression/builtin_info.go | 2 +- expression/integration_test.go | 43 ++++++++++++++++++++++++----- parser/model/model.go | 5 ++++ planner/core/expression_rewriter.go | 7 ++++- planner/core/planbuilder.go | 4 +-- 8 files changed, 55 insertions(+), 16 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 965213864f687..632aa4874556a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1577,7 +1577,7 @@ func buildTableInfo( tbInfo.CommonHandleVersion = 1 } } - if tbInfo.PKIsHandle || tbInfo.IsCommonHandle { + if tbInfo.HasClusteredIndex() { // Primary key cannot be invisible. if constr.Option != nil && constr.Option.Visibility == ast.IndexVisibilityInvisible { return nil, ErrPKIndexCantBeInvisible @@ -2438,7 +2438,7 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err case ast.TableOptionCompression: tbInfo.Compression = op.StrValue case ast.TableOptionShardRowID: - if op.UintValue > 0 && (tbInfo.PKIsHandle || tbInfo.IsCommonHandle) { + if op.UintValue > 0 && tbInfo.HasClusteredIndex() { return errUnsupportedShardRowIDBits } tbInfo.ShardRowIDBits = op.UintValue @@ -2946,7 +2946,7 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint // Nothing need to do. return nil } - if uVal > 0 && (t.Meta().PKIsHandle || t.Meta().IsCommonHandle) { + if uVal > 0 && t.Meta().HasClusteredIndex() { return errUnsupportedShardRowIDBits } err = verifyNoOverflowShardBits(d.sessPool, t, uVal) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 483e970d479fc..1e4fcae3829ba 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -579,7 +579,7 @@ func (e *memtableRetriever) setDataFromTables(ctx context.Context, sctx sessionc rowCount = 1 } } - if table.PKIsHandle || table.IsCommonHandle { + if table.HasClusteredIndex() { pkType = "CLUSTERED" } shardingInfo := infoschema.GetShardingInfo(schema, table) diff --git a/executor/show.go b/executor/show.go index 935b2c6bae64a..2a4eac148f74e 100644 --- a/executor/show.go +++ b/executor/show.go @@ -975,7 +975,7 @@ func ConstructResultOfShowCreateTable(ctx sessionctx.Context, tableInfo *model.T fmt.Fprintf(buf, ` COMMENT '%s'`, format.OutputFormat(idxInfo.Comment)) } if idxInfo.Primary { - if tableInfo.PKIsHandle || tableInfo.IsCommonHandle { + if tableInfo.HasClusteredIndex() { buf.WriteString(" /*T![clustered_index] CLUSTERED */") } else { buf.WriteString(" /*T![clustered_index] NONCLUSTERED */") diff --git a/expression/builtin_info.go b/expression/builtin_info.go index 013ee74d66bac..22018213ef9e7 100644 --- a/expression/builtin_info.go +++ b/expression/builtin_info.go @@ -746,7 +746,7 @@ func (b *builtinTiDBDecodeKeySig) Clone() builtinFunc { return newSig } -// evalInt evals a builtinTiDBIsDDLOwnerSig. +// evalInt evals a builtinTiDBDecodeKeySig. func (b *builtinTiDBDecodeKeySig) evalString(row chunk.Row) (string, bool, error) { s, isNull, err := b.args[0].EvalString(b.ctx, row) if isNull || err != nil { diff --git a/expression/integration_test.go b/expression/integration_test.go index 616b8d793b7da..677b33d68a500 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2826,7 +2826,7 @@ func TestTiDBDecodePlanFunc(t *testing.T) { tk.MustQuery("select tidb_decode_plan('xxx')").Check(testkit.Rows("xxx")) } -func TestTiDBInternalFunc(t *testing.T) { +func TestTiDBDecodeKeyFunc(t *testing.T) { t.Parallel() store, clean := testkit.CreateMockStore(t) @@ -2876,16 +2876,14 @@ func TestTiDBInternalFunc(t *testing.T) { h, err := kv.NewCommonHandle(k) require.NoError(t, err) k = tablecodec.EncodeRowKeyWithHandle(tableID, h) - hexKey := hex.EncodeToString(codec.EncodeBytes(nil, k)) - return hexKey + return hex.EncodeToString(codec.EncodeBytes(nil, k)) } // split table t by ('bbbb', 10, '2020-01-01'); data := []types.Datum{types.NewStringDatum("bbbb"), types.NewIntDatum(10), types.NewTimeDatum(getTime(2020, 1, 1, mysql.TypeDatetime))} hexKey := buildCommonKeyFromData(tbl.Meta().ID, data) sql := fmt.Sprintf("select tidb_decode_key( '%s' )", hexKey) - result = tk.MustQuery(sql) rs := fmt.Sprintf(`{"handle":{"a":"bbbb","b":"10","c":"2020-01-01 00:00:00"},"table_id":%d}`, tbl.Meta().ID) - result.Check(testkit.Rows(rs)) + tk.MustQuery(sql).Check(testkit.Rows(rs)) // split table t by ('bbbb', 10, null); data = []types.Datum{types.NewStringDatum("bbbb"), types.NewIntDatum(10), types.NewDatum(nil)} @@ -2903,8 +2901,7 @@ func TestTiDBInternalFunc(t *testing.T) { k, err := codec.EncodeKey(tk.Session().GetSessionVars().StmtCtx, nil, data...) require.NoError(t, err) k = tablecodec.EncodeIndexSeekKey(tableID, indexID, k) - hexKey := hex.EncodeToString(codec.EncodeBytes(nil, k)) - return hexKey + return hex.EncodeToString(codec.EncodeBytes(nil, k)) } // split table t index idx by ('aaaaa', 100, '2000-01-01'); data = []types.Datum{types.NewStringDatum("aaaaa"), types.NewIntDatum(100), types.NewTimeDatum(getTime(2000, 1, 1, mysql.TypeDatetime))} @@ -2925,6 +2922,38 @@ func TestTiDBInternalFunc(t *testing.T) { hexKey = "7480000000000000375F69800000000000000103800000000001D4C1023B6458" sql = fmt.Sprintf("select tidb_decode_key( '%s' )", hexKey) tk.MustQuery(sql).Check(testkit.Rows(hexKey)) + + // Test the table with the nonclustered index. + const rowID = 10 + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int primary key nonclustered, b int, key bk (b));") + dom = domain.GetDomain(tk.Session()) + is = dom.InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + buildTableRowKey := func(tableID, rowID int64) string { + return hex.EncodeToString( + codec.EncodeBytes( + nil, + tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(rowID)), + )) + } + hexKey = buildTableRowKey(tbl.Meta().ID, rowID) + sql = fmt.Sprintf("select tidb_decode_key( '%s' )", hexKey) + rs = fmt.Sprintf(`{"_tidb_rowid":%d,"table_id":"%d"}`, rowID, tbl.Meta().ID) + tk.MustQuery(sql).Check(testkit.Rows(rs)) + + // Test the table with the clustered index. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int primary key clustered, b int, key bk (b));") + dom = domain.GetDomain(tk.Session()) + is = dom.InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + hexKey = buildTableRowKey(tbl.Meta().ID, rowID) + sql = fmt.Sprintf("select tidb_decode_key( '%s' )", hexKey) + rs = fmt.Sprintf(`{"%s":%d,"table_id":"%d"}`, tbl.Meta().GetPkName().String(), rowID, tbl.Meta().ID) + tk.MustQuery(sql).Check(testkit.Rows(rs)) } func TestTwoDecimalTruncate(t *testing.T) { diff --git a/parser/model/model.go b/parser/model/model.go index 20e770c607149..c08ea8b8b309c 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -656,6 +656,11 @@ func (t *TableInfo) ColumnIsInIndex(c *ColumnInfo) bool { return false } +// HasClusteredIndex checks whether the table has a clustered index. +func (t *TableInfo) HasClusteredIndex() bool { + return t.PKIsHandle || t.IsCommonHandle +} + // IsView checks if TableInfo is a view. func (t *TableInfo) IsView() bool { return t.View != nil diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index a370fdaa36697..8f950b3d3ece5 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -2030,7 +2030,12 @@ func decodeRecordKey(key []byte, tableID int64, tbl table.Table, loc *time.Locat if handle.IsInt() { ret := make(map[string]interface{}) ret["table_id"] = strconv.FormatInt(tableID, 10) - ret["_tidb_rowid"] = handle.IntValue() + // When the clustered index is enabled, we should show the PK name. + if tbl.Meta().HasClusteredIndex() { + ret[tbl.Meta().GetPkName().String()] = handle.IntValue() + } else { + ret["_tidb_rowid"] = handle.IntValue() + } retStr, err := json.Marshal(ret) if err != nil { return "", errors.Trace(err) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 3dd919749c25b..b830d26da025d 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -960,7 +960,7 @@ func getPathByIndexName(paths []*util.AccessPath, idxName model.CIStr, tblInfo * return path } } - if isPrimaryIndex(idxName) && (tblInfo.PKIsHandle || tblInfo.IsCommonHandle) { + if isPrimaryIndex(idxName) && tblInfo.HasClusteredIndex() { return tablePath } return nil @@ -1707,7 +1707,7 @@ func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo [] if col.IsGenerated() && !col.GeneratedStored { continue } - if mysql.HasPriKeyFlag(col.Flag) && (tbl.PKIsHandle || tbl.IsCommonHandle) { + if mysql.HasPriKeyFlag(col.Flag) && tbl.HasClusteredIndex() { continue } colsInfo = append(colsInfo, col) From 66163de74176f65107664339b2ae006f0b15aed0 Mon Sep 17 00:00:00 2001 From: Zach <51114270+zach030@users.noreply.github.com> Date: Fri, 10 Dec 2021 23:51:57 +0800 Subject: [PATCH 04/10] ddl/callback_test.go: migrate test-infra to testify (#30317) --- ddl/callback_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ddl/callback_test.go b/ddl/callback_test.go index c048f9c2442ed..c8975da252cad 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -16,12 +16,13 @@ package ddl import ( "context" + "testing" - . "github.com/pingcap/check" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -117,9 +118,9 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) { tc.BaseCallback.OnWatched(ctx) } -func (s *testDDLSuite) TestCallback(c *C) { +func TestCallback(t *testing.T) { cb := &BaseCallback{} - c.Assert(cb.OnChanged(nil), IsNil) + require.Nil(t, cb.OnChanged(nil)) cb.OnJobRunBefore(nil) cb.OnJobUpdated(nil) cb.OnWatched(context.TODO()) From 93e2c12346af5335beac34461ea6eb13d3616162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Sat, 11 Dec 2021 00:41:57 +0800 Subject: [PATCH 05/10] *: Rename some names of placement ddl operation (#30622) --- br/pkg/backup/client.go | 2 +- ddl/ddl_api.go | 2 +- ddl/ddl_worker.go | 4 ++-- ddl/table.go | 2 +- executor/executor_test.go | 2 +- parser/model/ddl.go | 4 ++-- parser/model/model_test.go | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index b675b062793c6..12a4344a432fe 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -381,7 +381,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool { case model.ActionCreatePlacementPolicy, model.ActionAlterPlacementPolicy, model.ActionDropPlacementPolicy, - model.ActionAlterTablePartitionPolicy, + model.ActionAlterTablePartitionPlacement, model.ActionModifySchemaDefaultPlacement, model.ActionAlterTablePlacement, model.ActionAlterTableAttributes, diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 632aa4874556a..957ba6e68386d 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -6440,7 +6440,7 @@ func (d *ddl) AlterTablePartitionPlacement(ctx sessionctx.Context, tableIdent as SchemaID: schema.ID, TableID: tblInfo.ID, SchemaName: schema.Name.L, - Type: model.ActionAlterTablePartitionPolicy, + Type: model.ActionAlterTablePartitionPlacement, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{partitionID, policyRefInfo, placementSettings}, } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 617c42c639d6c..24c3769217e5d 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -838,8 +838,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onDropPlacementPolicy(d, t, job) case model.ActionAlterPlacementPolicy: ver, err = onAlterPlacementPolicy(d, t, job) - case model.ActionAlterTablePartitionPolicy: - ver, err = onAlterTablePartitionOptions(d, t, job) + case model.ActionAlterTablePartitionPlacement: + ver, err = onAlterTablePartitionPlacement(t, job) case model.ActionAlterTablePlacement: ver, err = onAlterTablePlacement(d, t, job) case model.ActionAlterCacheTable: diff --git a/ddl/table.go b/ddl/table.go index 906db66f3ac9d..504f85f83faed 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -1247,7 +1247,7 @@ func onAlterTablePartitionAttributes(t *meta.Meta, job *model.Job) (ver int64, e return ver, nil } -func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { +func onAlterTablePartitionPlacement(t *meta.Meta, job *model.Job) (ver int64, err error) { var partitionID int64 policyRefInfo := &model.PolicyRefInfo{} placementSettings := &model.PlacementSettings{} diff --git a/executor/executor_test.go b/executor/executor_test.go index f503504d739b0..c93aa09e6ccdf 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -615,7 +615,7 @@ func (s *testSuiteP2) TestAdminShowDDLJobsInfo(c *C) { "PRIMARY_REGION=\"cn-east-1\" " + "REGIONS=\"cn-east-1, cn-east-2\" " + "FOLLOWERS=2 ") - c.Assert(tk.MustQuery("admin show ddl jobs 1").Rows()[0][3], Equals, "alter table partition policy") + c.Assert(tk.MustQuery("admin show ddl jobs 1").Rows()[0][3], Equals, "alter table partition placement") tk.MustExec("alter table tt1 cache") c.Assert(tk.MustQuery("admin show ddl jobs 1").Rows()[0][3], Equals, "alter table cache") diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 7062e68673210..c61372b55e263 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -88,7 +88,7 @@ const ( ActionCreatePlacementPolicy ActionType = 51 ActionAlterPlacementPolicy ActionType = 52 ActionDropPlacementPolicy ActionType = 53 - ActionAlterTablePartitionPolicy ActionType = 54 + ActionAlterTablePartitionPlacement ActionType = 54 ActionModifySchemaDefaultPlacement ActionType = 55 ActionAlterTablePlacement ActionType = 56 ActionAlterCacheTable ActionType = 57 @@ -145,7 +145,7 @@ var actionMap = map[ActionType]string{ ActionAlterCheckConstraint: "alter check constraint", ActionDropIndexes: "drop multi-indexes", ActionAlterTableAttributes: "alter table attributes", - ActionAlterTablePartitionPolicy: "alter table partition policy", + ActionAlterTablePartitionPlacement: "alter table partition placement", ActionAlterTablePartitionAttributes: "alter table partition attributes", ActionCreatePlacementPolicy: "create placement policy", ActionAlterPlacementPolicy: "alter placement policy", diff --git a/parser/model/model_test.go b/parser/model/model_test.go index 8c2365ea47ca4..c4968aa48855c 100644 --- a/parser/model/model_test.go +++ b/parser/model/model_test.go @@ -302,7 +302,7 @@ func TestString(t *testing.T) { {ActionModifySchemaCharsetAndCollate, "modify schema charset and collate"}, {ActionDropIndexes, "drop multi-indexes"}, {ActionAlterTablePlacement, "alter table placement"}, - {ActionAlterTablePartitionPolicy, "alter table partition policy"}, + {ActionAlterTablePartitionPlacement, "alter table partition placement"}, {ActionAlterNoCacheTable, "alter table nocache"}, } From 3dce612f046cae41f1fddaaf4cebe0c03540093e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sat, 11 Dec 2021 11:13:57 +0800 Subject: [PATCH 06/10] executor: fix data race in the index_lookup_hash_join (#30619) --- executor/index_lookup_hash_join.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index c404d8cac6f2c..75d84c2162480 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -168,19 +168,17 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { innerCh := make(chan *indexHashJoinTask, concurrency) if e.keepOuterOrder { e.taskCh = make(chan *indexHashJoinTask, concurrency) - } - e.workerWg.Add(1) - ow := e.newOuterWorker(innerCh) - go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers) - - if !e.keepOuterOrder { - e.resultCh = make(chan *indexHashJoinResult, concurrency) - } else { // When `keepOuterOrder` is true, each task holds their own `resultCh` // individually, thus we do not need a global resultCh. e.resultCh = nil + } else { + e.resultCh = make(chan *indexHashJoinResult, concurrency) } e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + e.workerWg.Add(1) + ow := e.newOuterWorker(innerCh) + go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers) + for i := 0; i < concurrency; i++ { if !e.keepOuterOrder { e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) From 8e525d8d882241fabb902bdae0d2d3b79b5cf6a8 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Sat, 11 Dec 2021 14:45:57 +0800 Subject: [PATCH 07/10] ddl: remove unnecessary locking when adding an index (#29772) --- ddl/column.go | 2 +- ddl/index.go | 11 ++--------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 0136ce40e315c..1507e7437982c 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1427,7 +1427,7 @@ func (w *updateColumnWorker) cleanRowMap() { } } -// BackfillDataInTxn will backfill the table record in a transaction, lock corresponding rowKey, if the value of rowKey is changed. +// BackfillDataInTxn will backfill the table record in a transaction. func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { oprStartTime := time.Now() errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { diff --git a/ddl/index.go b/ddl/index.go index b6a14dfa9eeb0..ffa9d069d4c09 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1293,8 +1293,8 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i return nil } -// BackfillDataInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed, -// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry. +// BackfillDataInTxn will backfill table index in a transaction. If the value of rowKey is changed, there must be some other transactions +// update the row, result in write conflict, so the txn will rollback and retry. // BackfillDataInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { failpoint.Inject("errorMockPanic", func(val failpoint.Value) { @@ -1329,13 +1329,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC continue } - // Lock the row key to notify us that someone delete or update the row, - // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.key) - if err != nil { - return errors.Trace(err) - } - // Create the index. handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData) if err != nil { From 626dee72508e67c59a9868e065228e4b1dbd9ce7 Mon Sep 17 00:00:00 2001 From: bb7133 Date: Sun, 12 Dec 2021 11:39:57 +0800 Subject: [PATCH 08/10] server: try to make `TidbTestSuite` more stable (#30643) --- server/tidb_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/tidb_test.go b/server/tidb_test.go index 8ea521fda6275..1b8941c34a49f 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -95,15 +95,15 @@ func createTidbTestSuite(t *testing.T) (*tidbTestSuite, func()) { ts.waitUntilServerOnline() cleanup := func() { - if ts.store != nil { - ts.store.Close() - } if ts.domain != nil { ts.domain.Close() } if ts.server != nil { ts.server.Close() } + if ts.store != nil { + require.NoError(t, ts.store.Close()) + } } return ts, cleanup From 755553660e6036ead13d1f0aa271eaf59d939fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Sun, 12 Dec 2021 20:20:34 +0800 Subject: [PATCH 09/10] *: Add some PD tests for placement and fix some bug found (#30621) --- ddl/db_test.go | 2 +- ddl/partition.go | 18 +-- ddl/placement_policy.go | 2 +- ddl/placement_policy_test.go | 221 +++++++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 10 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index deecc27b8974b..309ca8a03e736 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -79,7 +79,7 @@ var _ = Suite(&testDBSuite2{&testDBSuite{}}) var _ = Suite(&testDBSuite3{&testDBSuite{}}) var _ = Suite(&testDBSuite4{&testDBSuite{}}) var _ = Suite(&testDBSuite5{&testDBSuite{}}) -var _ = Suite(&testDBSuite6{&testDBSuite{}}) +var _ = SerialSuites(&testDBSuite6{&testDBSuite{}}) var _ = Suite(&testDBSuite7{&testDBSuite{}}) var _ = Suite(&testDBSuite8{&testDBSuite{}}) var _ = SerialSuites(&testSerialDBSuite{&testDBSuite{}}) diff --git a/ddl/partition.go b/ddl/partition.go index b9d0d72d91864..6f6d3b8b0ff59 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -88,6 +88,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v return ver, nil } + // notice: addingDefinitions is empty when job is in state model.StateNone tblInfo, partInfo, addingDefinitions, err := checkAddPartition(t, job) if err != nil { return ver, err @@ -117,14 +118,21 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v return ver, errors.Trace(err) } + // move the adding definition into tableInfo. + updateAddingPartitionInfo(partInfo, tblInfo) + ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + // modify placement settings - for _, def := range addingDefinitions { + for _, def := range tblInfo.Partition.AddingDefinitions { if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(t, job, def.PlacementPolicyRef); err != nil { return ver, errors.Trace(err) } } - bundles, err := alterTablePartitionBundles(t, tblInfo, addingDefinitions) + bundles, err := alterTablePartitionBundles(t, tblInfo, tblInfo.Partition.AddingDefinitions) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -135,12 +143,6 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } - // move the adding definition into tableInfo. - updateAddingPartitionInfo(partInfo, tblInfo) - ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true) - if err != nil { - return ver, errors.Trace(err) - } // none -> replica only job.SchemaState = model.StateReplicaOnly case model.StateReplicaOnly: diff --git a/ddl/placement_policy.go b/ddl/placement_policy.go index 65a445ca6263e..3ccba4ef346f6 100644 --- a/ddl/placement_policy.go +++ b/ddl/placement_policy.go @@ -331,7 +331,7 @@ func getPlacementPolicyDependedObjectsIDs(t *meta.Meta, policy *model.PolicyInfo } if tblInfo.Partition != nil { for _, part := range tblInfo.Partition.Definitions { - if part.PlacementPolicyRef != nil && part.PlacementPolicyRef.ID == part.ID { + if part.PlacementPolicyRef != nil && part.PlacementPolicyRef.ID == policy.ID { partIDs = append(partIDs, part.ID) } } diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index f4c4eb54e69b3..c8626121515d9 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "context" + "encoding/json" "fmt" "math" "strconv" @@ -48,7 +49,83 @@ func clearAllBundles(c *C) { c.Assert(err, IsNil) } +func checkExistTableBundlesInPD(c *C, do *domain.Domain, dbName string, tbName string) { + tblInfo, err := do.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName)) + c.Assert(err, IsNil) + + c.Assert(kv.RunInNewTxn(context.TODO(), do.Store(), false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + checkTableBundlesInPD(c, t, tblInfo.Meta()) + return nil + }), IsNil) +} + +func checkAllBundlesNotChange(c *C, bundles []*placement.Bundle) { + currentBundles, err := infosync.GetAllRuleBundles(context.TODO()) + c.Assert(err, IsNil) + + bundlesMap := make(map[string]*placement.Bundle) + for _, bundle := range currentBundles { + bundlesMap[bundle.ID] = bundle + } + c.Assert(len(bundlesMap), Equals, len(currentBundles)) + c.Assert(len(currentBundles), Equals, len(bundles)) + + for _, bundle := range bundles { + got, ok := bundlesMap[bundle.ID] + c.Assert(ok, IsTrue) + + expectedJSON, err := json.Marshal(bundle) + c.Assert(err, IsNil) + + gotJSON, err := json.Marshal(got) + c.Assert(err, IsNil) + c.Assert(string(gotJSON), Equals, string(expectedJSON)) + } +} + +func checkTableBundlesInPD(c *C, t *meta.Meta, tblInfo *model.TableInfo) { + checks := make([]*struct { + ID string + bundle *placement.Bundle + }, 0) + + bundle, err := placement.NewTableBundle(t, tblInfo) + c.Assert(err, IsNil) + checks = append(checks, &struct { + ID string + bundle *placement.Bundle + }{ID: placement.GroupID(tblInfo.ID), bundle: bundle}) + + if tblInfo.Partition != nil { + for _, def := range tblInfo.Partition.Definitions { + bundle, err := placement.NewPartitionBundle(t, def) + c.Assert(err, IsNil) + checks = append(checks, &struct { + ID string + bundle *placement.Bundle + }{ID: placement.GroupID(def.ID), bundle: bundle}) + } + } + + for _, check := range checks { + got, err := infosync.GetRuleBundle(context.TODO(), check.ID) + c.Assert(err, IsNil) + if check.bundle == nil { + c.Assert(got.IsEmpty(), IsTrue) + } else { + expectedJSON, err := json.Marshal(check.bundle) + c.Assert(err, IsNil) + + gotJSON, err := json.Marshal(got) + c.Assert(err, IsNil) + c.Assert(string(gotJSON), Equals, string(expectedJSON)) + } + } +} + func (s *testDBSuite6) TestPlacementPolicy(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop placement policy if exists x") @@ -114,6 +191,10 @@ func (s *testDBSuite6) TestPlacementPolicy(c *C) { "REGIONS=\"cn-east-1,cn-east-2\" ") tk.MustQuery("show warnings").Check(testkit.Rows("Note 8238 Placement policy 'X' already exists")) + bundles, err := infosync.GetAllRuleBundles(context.TODO()) + c.Assert(err, IsNil) + c.Assert(0, Equals, len(bundles)) + tk.MustExec("drop placement policy x") tk.MustGetErrCode("drop placement policy x", mysql.ErrPlacementPolicyNotExists) tk.MustExec("drop placement policy if exists x") @@ -365,12 +446,21 @@ func (s *testDBSuite6) TestCreateOrReplacePlacementPolicy(c *C) { } func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop placement policy if exists x") + tk.MustExec("drop table if exists tp") tk.MustExec("create placement policy x primary_region=\"cn-east-1\" regions=\"cn-east-1,cn-east\"") defer tk.MustExec("drop placement policy if exists x") + // create a table ref to policy x, testing for alter policy will update PD bundles + tk.MustExec(`CREATE TABLE tp (id INT) placement policy x PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (1000) placement policy x + );`) + defer tk.MustExec("drop table if exists tp") + policy, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("x")) c.Assert(ok, IsTrue) @@ -378,6 +468,7 @@ func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { tk.MustExec("alter placement policy x PRIMARY_REGION=\"bj\" REGIONS=\"bj,sh\"") tk.MustQuery("show placement where target='POLICY x'").Check(testkit.Rows("POLICY x PRIMARY_REGION=\"bj\" REGIONS=\"bj,sh\" NULL")) tk.MustQuery("select * from information_schema.placement_rules where policy_name = 'x'").Check(testkit.Rows(strconv.FormatInt(policy.ID, 10) + " def x bj bj,sh 0 0")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tk.MustExec("alter placement policy x " + "PRIMARY_REGION=\"bj\" " + @@ -385,6 +476,7 @@ func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { "SCHEDULE=\"EVEN\"") tk.MustQuery("show placement where target='POLICY x'").Check(testkit.Rows("POLICY x PRIMARY_REGION=\"bj\" REGIONS=\"bj\" SCHEDULE=\"EVEN\" NULL")) tk.MustQuery("select * from INFORMATION_SCHEMA.PLACEMENT_RULES WHERE POLICY_NAME='x'").Check(testkit.Rows(strconv.FormatInt(policy.ID, 10) + " def x bj bj EVEN 0 0")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tk.MustExec("alter placement policy x " + "LEADER_CONSTRAINTS=\"[+region=us-east-1]\" " + @@ -396,6 +488,7 @@ func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { tk.MustQuery("SELECT POLICY_NAME,LEADER_CONSTRAINTS,FOLLOWER_CONSTRAINTS,FOLLOWERS FROM information_schema.PLACEMENT_RULES WHERE POLICY_NAME = 'x'").Check( testkit.Rows("x [+region=us-east-1] [+region=us-east-2] 3"), ) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tk.MustExec("alter placement policy x " + "VOTER_CONSTRAINTS=\"[+region=bj]\" " + @@ -412,8 +505,10 @@ func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { "SCHEDULE,FOLLOWERS,LEARNERS FROM INFORMATION_SCHEMA.placement_rules WHERE POLICY_NAME='x'").Check( testkit.Rows("def x [+disk=ssd] [+region=sh] 0 3"), ) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // test alter not exist policies + tk.MustExec("drop table tp") tk.MustExec("drop placement policy x") tk.MustGetErrCode("alter placement policy x REGIONS=\"bj,sh\"", mysql.ErrPlacementPolicyNotExists) tk.MustGetErrCode("alter placement policy x2 REGIONS=\"bj,sh\"", mysql.ErrPlacementPolicyNotExists) @@ -421,6 +516,7 @@ func (s *testDBSuite6) TestAlterPlacementPolicy(c *C) { } func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t,t_range_p,t_hash_p,t_list_p") @@ -439,6 +535,7 @@ func (s *testDBSuite6) TestCreateTableWithPlacementPolicy(c *C) { "FOLLOWERS=2 ") defer tk.MustExec("DROP TABLE IF EXISTS t") tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.Tables WHERE TABLE_SCHEMA='test' AND TABLE_NAME = 't'").Check(testkit.Rows(`def test t PRIMARY_REGION="cn-east-1" REGIONS="cn-east-1, cn-east-2" FOLLOWERS=2`)) + checkExistTableBundlesInPD(c, s.dom, "test", "t") tbl := testGetTableByName(c, tk.Se, "test", "t") c.Assert(tbl, NotNil) @@ -763,6 +860,7 @@ func (s *testDBSuite6) TestPolicyCacheAndPolicyDependency(c *C) { } func (s *testDBSuite6) TestAlterTablePartitionWithPlacementPolicy(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) defer func() { tk.MustExec("drop table if exists t1") @@ -782,6 +880,7 @@ func (s *testDBSuite6) TestAlterTablePartitionWithPlacementPolicy(c *C) { "PRIMARY_REGION,REGIONS,CONSTRAINTS,LEADER_CONSTRAINTS,FOLLOWER_CONSTRAINTS,LEARNER_CONSTRAINTS," + "SCHEDULE,FOLLOWERS,LEARNERS FROM INFORMATION_SCHEMA.placement_rules WHERE table_NAME='t1'").Check( testkit.Rows()) + checkExistTableBundlesInPD(c, s.dom, "test", "t1") tk.MustExec("alter table t1 partition p0 " + "PRIMARY_REGION=\"cn-east-1\" " + @@ -795,6 +894,7 @@ func (s *testDBSuite6) TestAlterTablePartitionWithPlacementPolicy(c *C) { ptDef := testGetPartitionDefinitionsByName(c, tk.Se, "test", "t1", "p0") c.Assert(ptDef.PlacementPolicyRef, IsNil) c.Assert(ptDef.DirectPlacementOpts, NotNil) + checkExistTableBundlesInPD(c, s.dom, "test", "t1") checkFunc := func(policySetting *model.PlacementSettings) { c.Assert(policySetting.PrimaryRegion, Equals, "cn-east-1") @@ -832,6 +932,7 @@ func (s *testDBSuite6) TestAlterTablePartitionWithPlacementPolicy(c *C) { tk.MustExec("alter table t1 partition p0 " + "PLACEMENT POLICY=\"x\"") tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, PARTITION_NAME, TIDB_PLACEMENT_POLICY_NAME, TIDB_DIRECT_PLACEMENT FROM information_schema.Partitions WHERE TABLE_SCHEMA='test' AND TABLE_NAME = 't1' AND PARTITION_NAME = 'p0'").Check(testkit.Rows(`def test t1 p0 x `)) + checkExistTableBundlesInPD(c, s.dom, "test", "t1") ptDef = testGetPartitionDefinitionsByName(c, tk.Se, "test", "t1", "p0") c.Assert(ptDef, NotNil) @@ -848,6 +949,7 @@ func (s *testDBSuite6) TestAlterTablePartitionWithPlacementPolicy(c *C) { ptDef = testGetPartitionDefinitionsByName(c, tk.Se, "test", "t1", "p0") c.Assert(ptDef, NotNil) c.Assert(ptDef.DirectPlacementOpts, NotNil) + checkExistTableBundlesInPD(c, s.dom, "test", "t1") checkFunc = func(policySetting *model.PlacementSettings) { c.Assert(policySetting.PrimaryRegion, Equals, "cn-east-1") @@ -883,6 +985,7 @@ func testGetPartitionDefinitionsByName(c *C, ctx sessionctx.Context, db string, } func (s *testDBSuite6) TestPolicyInheritance(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t, t0") @@ -897,12 +1000,14 @@ func (s *testDBSuite6) TestPolicyInheritance(c *C) { tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] CONSTRAINTS=\"[+zone=hangzhou]\" */")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int) constraints=\"[+zone=suzhou]\"") tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] CONSTRAINTS=\"[+zone=suzhou]\" */")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t") tk.MustExec("drop table if exists t") // test create table like should not inherit database's placement rules. @@ -910,10 +1015,12 @@ func (s *testDBSuite6) TestPolicyInheritance(c *C) { tk.MustQuery("show create table t0").Check(testkit.Rows("t0 CREATE TABLE `t0` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t0") tk.MustExec("create table t1 like t0") tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t1") tk.MustExec("drop table if exists t0, t") // table will inherit db's placement rules, which is shared by all partition as default one. @@ -924,6 +1031,7 @@ func (s *testDBSuite6) TestPolicyInheritance(c *C) { "PARTITION BY RANGE (`a`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (200))")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t") tk.MustExec("drop table if exists t") // partition's specified placement rules will override the default one. @@ -934,6 +1042,7 @@ func (s *testDBSuite6) TestPolicyInheritance(c *C) { "PARTITION BY RANGE (`a`)\n" + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] CONSTRAINTS=\"[+zone=suzhou]\" */,\n" + " PARTITION `p1` VALUES LESS THAN (200))")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t") tk.MustExec("drop table if exists t") // test partition override table's placement rules. @@ -945,6 +1054,7 @@ func (s *testDBSuite6) TestPolicyInheritance(c *C) { "PARTITION BY RANGE (`a`)\n" + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] CONSTRAINTS=\"[+zone=changzhou]\" */,\n" + " PARTITION `p1` VALUES LESS THAN (200))")) + checkExistTableBundlesInPD(c, s.dom, "mydb", "t") } func (s *testDBSuite6) TestDatabasePlacement(c *C) { @@ -1103,6 +1213,7 @@ func (s *testDBSuite6) TestDropTableGCPlacement(c *C) { } func (s *testDBSuite6) TestAlterTablePlacement(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists tp") @@ -1126,6 +1237,7 @@ func (s *testDBSuite6) TestAlterTablePlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // alter with policy tk.MustExec("alter table tp placement policy p1") @@ -1141,6 +1253,7 @@ func (s *testDBSuite6) TestAlterTablePlacement(c *C) { c.Assert(err, IsNil) c.Assert(tb.Meta().PlacementPolicyRef.ID, Equals, policy.ID) c.Assert(tb.Meta().DirectPlacementOpts, IsNil) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // alter with direct placement tk.MustExec("alter table tp primary_region='r2' regions='r1,r2'") @@ -1151,6 +1264,7 @@ func (s *testDBSuite6) TestAlterTablePlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // reset with placement policy 'default' tk.MustExec("alter table tp placement policy default") @@ -1161,6 +1275,7 @@ func (s *testDBSuite6) TestAlterTablePlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // error invalid policy err = tk.ExecToErr("alter table tp placement policy px") @@ -1182,6 +1297,7 @@ func (s *testDBSuite6) TestAlterTablePlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") } func (s *testDBSuite6) TestDropTablePartitionGCPlacement(c *C) { @@ -1246,6 +1362,7 @@ func (s *testDBSuite6) TestDropTablePartitionGCPlacement(c *C) { } func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists tp") @@ -1273,6 +1390,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // alter with policy tk.MustExec("alter table tp partition p0 placement policy p1") @@ -1288,6 +1406,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { c.Assert(err, IsNil) c.Assert(tb.Meta().Partition.Definitions[0].PlacementPolicyRef.ID, Equals, policy.ID) c.Assert(tb.Meta().Partition.Definitions[0].DirectPlacementOpts, IsNil) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // alter with direct placement tk.MustExec("alter table tp partition p1 primary_region='r2' regions='r1,r2'") @@ -1298,6 +1417,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r1,r2\" */)")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tk.MustExec("alter table tp partition p1 primary_region='r3' regions='r3,r4'") tk.MustQuery("show create table tp").Check(testkit.Rows("" + @@ -1307,6 +1427,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // reset with placement policy 'default' tk.MustExec("alter table tp partition p1 placement policy default") @@ -1317,6 +1438,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tk.MustExec("alter table tp partition p0 placement policy default") tk.MustQuery("show create table tp").Check(testkit.Rows("" + @@ -1326,6 +1448,7 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // error invalid policy err = tk.ExecToErr("alter table tp partition p1 placement policy px") @@ -1351,9 +1474,11 @@ func (s *testDBSuite6) TestAlterTablePartitionPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") } func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists tp") @@ -1377,6 +1502,7 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) { "PARTITION BY RANGE (`id`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // Add partitions tk.MustExec(`alter table tp add partition ( @@ -1394,6 +1520,7 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) { " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p3` VALUES LESS THAN (100000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */,\n" + " PARTITION `p4` VALUES LESS THAN (1000000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") tb, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) c.Assert(err, IsNil) @@ -1422,6 +1549,7 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) { " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p3` VALUES LESS THAN (100000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */,\n" + " PARTITION `p4` VALUES LESS THAN (1000000))")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") } func (s *testDBSuite6) TestTruncateTableWithPlacement(c *C) { @@ -1682,6 +1810,7 @@ func (s *testDBSuite6) TestTruncatePartitionGCWithPlacement(c *C) { } func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { + clearAllBundles(c) tk := testkit.NewTestKit(c, s.store) tk.MustExec("set @@tidb_enable_exchange_partition=1") tk.MustExec("use test") @@ -1754,6 +1883,7 @@ func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { c.Assert(t1.Meta().ID, Equals, par0ID) c.Assert(t1.Meta().DirectPlacementOpts, IsNil) c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // exchange par0, t2 tk.MustExec("alter table tp exchange partition p0 with table t2") @@ -1780,6 +1910,7 @@ func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { c.Assert(t2.Meta().ID, Equals, t1ID) c.Assert(t2.Meta().DirectPlacementOpts, IsNil) c.Assert(t2.Meta().PlacementPolicyRef, IsNil) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // exchange par1, t1 tk.MustExec("alter table tp exchange partition p1 with table t1") @@ -1806,6 +1937,7 @@ func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { c.Assert(t1.Meta().ID, Equals, par1ID) c.Assert(t1.Meta().DirectPlacementOpts, IsNil) c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") // exchange par2, t2 tk.MustExec("alter table tp exchange partition p2 with table t2") @@ -1833,4 +1965,93 @@ func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { c.Assert(t2.Meta().ID, Equals, par2ID) c.Assert(t2.Meta().DirectPlacementOpts, IsNil) c.Assert(t2.Meta().PlacementPolicyRef, IsNil) + checkExistTableBundlesInPD(c, s.dom, "test", "tp") +} + +func (s *testDBSuite6) TestPDFail(c *C) { + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"), IsNil) + }() + + clearAllBundles(c) + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop placement policy if exists p1") + tk.MustExec("drop table if exists t1, t2, tp") + + tk.MustExec("create placement policy p1 primary_region=\"cn-east-1\" regions=\"cn-east-1,cn-east\"") + defer tk.MustExec("drop placement policy if exists p1") + + tk.MustExec("create table t1(id int)") + defer tk.MustExec("drop table if exists t1") + + tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (1000) placement policy p1 + );`) + defer tk.MustExec("drop table if exists tp") + existBundles, err := infosync.GetAllRuleBundles(context.TODO()) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "return(true)"), IsNil) + + // alter policy + err = tk.ExecToErr("alter placement policy p1 primary_region='rx' regions='rx'") + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + tk.MustQuery("show create placement policy p1").Check(testkit.Rows("p1 CREATE PLACEMENT POLICY `p1` PRIMARY_REGION=\"cn-east-1\" REGIONS=\"cn-east-1,cn-east\"")) + checkAllBundlesNotChange(c, existBundles) + + // create table + err = tk.ExecToErr("create table t2 (id int) placement policy p1") + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + err = tk.ExecToErr("show create table t2") + c.Assert(infoschema.ErrTableNotExists.Equal(err), IsTrue) + checkAllBundlesNotChange(c, existBundles) + + // alter table + err = tk.ExecToErr("alter table t1 placement policy p1") + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + checkAllBundlesNotChange(c, existBundles) + + // add partition + err = tk.ExecToErr("alter table tp add partition (" + + "partition p2 values less than (10000) placement policy p1," + + "partition p3 values less than (100000) primary_region=\"r1\" regions=\"r1,r2\"" + + ")") + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + tk.MustQuery("show create table tp").Check(testkit.Rows("tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)")) + checkAllBundlesNotChange(c, existBundles) + + // alter partition + err = tk.ExecToErr(`alter table tp PARTITION p1 primary_region="r2" regions="r2,r3"`) + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + tk.MustQuery("show create table tp").Check(testkit.Rows("tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)")) + checkAllBundlesNotChange(c, existBundles) + + // exchange partition + tk.MustExec("alter table tp exchange partition p1 with table t1") + c.Assert(infosync.ErrHTTPServiceError.Equal(err), IsTrue) + tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustQuery("show create table tp").Check(testkit.Rows("tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)")) + checkAllBundlesNotChange(c, existBundles) } From 1f26870ccbc9562f4a3c280d3aa1ac7f297b6dcc Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Dec 2021 12:04:34 +0800 Subject: [PATCH 10/10] *: migrate sync.WaitGroup to util.WaitGroupWrapper (#30644) --- session/isolation_test.go | 17 ++++++----------- session/session_test.go | 22 ++++++++-------------- statistics/handle/dump_serial_test.go | 9 ++++----- 3 files changed, 18 insertions(+), 30 deletions(-) diff --git a/session/isolation_test.go b/session/isolation_test.go index fcac60418aa4c..ec8aa6a7fa9ae 100644 --- a/session/isolation_test.go +++ b/session/isolation_test.go @@ -15,9 +15,8 @@ package session_test import ( - "sync" - . "github.com/pingcap/check" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/testkit" ) @@ -55,12 +54,10 @@ func (s *testIsolationSuite) TestP0DirtyWrite(c *C) { session1.MustExec("begin;") session1.MustExec("update x set c = c+1 where id = 1;") session2.MustExec("begin;") - var wg sync.WaitGroup - wg.Add(1) - go func() { + var wg util.WaitGroupWrapper + wg.Run(func() { session2.MustExec("update x set c = c+1 where id = 1;") - wg.Done() - }() + }) session1.MustExec("commit;") wg.Wait() session2.MustExec("commit;") @@ -75,11 +72,9 @@ func (s *testIsolationSuite) TestP0DirtyWrite(c *C) { session1.MustExec("begin;") session1.MustExec("update x set c = c+1 where id = 1;") session2.MustExec("begin;") - wg.Add(1) - go func() { + wg.Run(func() { session2.MustExec("update x set c = c+1 where id = 1;") - wg.Done() - }() + }) session1.MustExec("commit;") wg.Wait() session2.MustExec("commit;") diff --git a/session/session_test.go b/session/session_test.go index 130aa895c01f2..acbc889102fe1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -62,6 +62,7 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/testkit" @@ -1762,31 +1763,24 @@ func (s *testSessionSuite2) TestRetry(c *C) { tk2.MustExec("set @@tidb_disable_txn_auto_retry = 0") tk3.MustExec("set @@tidb_disable_txn_auto_retry = 0") - var wg sync.WaitGroup - wg.Add(3) - f1 := func() { - defer wg.Done() + var wg util.WaitGroupWrapper + wg.Run(func() { for i := 0; i < 30; i++ { tk1.MustExec("update t set c = 1;") } - } - f2 := func() { - defer wg.Done() + }) + wg.Run(func() { for i := 0; i < 30; i++ { tk2.MustExec("update t set c = ?;", 1) } - } - f3 := func() { - defer wg.Done() + }) + wg.Run(func() { for i := 0; i < 30; i++ { tk3.MustExec("begin") tk3.MustExec("update t set c = 1;") tk3.MustExec("commit") } - } - go f1() - go f2() - go f3() + }) wg.Wait() } diff --git a/statistics/handle/dump_serial_test.go b/statistics/handle/dump_serial_test.go index a70bc43fb750f..01474870b2e4a 100644 --- a/statistics/handle/dump_serial_test.go +++ b/statistics/handle/dump_serial_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" ) @@ -322,12 +323,10 @@ func TestDumpExtendedStats(t *testing.T) { requireTableEqual(t, loadTbl, tbl) cleanStats(tk, dom) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { + wg := util.WaitGroupWrapper{} + wg.Run(func() { require.Nil(t, h.Update(is)) - wg.Done() - }() + }) err = h.LoadStatsFromJSON(is, jsonTbl) wg.Wait() require.NoError(t, err)