From 52e89cb8bfba9acf5e75397b32d898726d7dcab7 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 2 Jun 2021 14:15:38 +0800 Subject: [PATCH 01/14] planner/core: support union all for mpp. (#24287) --- executor/mpp_gather.go | 24 ++-- executor/tiflash_test.go | 42 ++++++ expression/column.go | 3 +- planner/core/exhaust_physical_plans.go | 34 ++++- planner/core/fragment.go | 139 ++++++++++++++++--- planner/core/initialize.go | 2 +- planner/core/logical_plan_builder.go | 4 + planner/core/physical_plans.go | 45 +++++- planner/core/plan.go | 3 + planner/core/rule_inject_extra_projection.go | 33 +++++ planner/core/task.go | 24 ++++ store/copr/mpp.go | 1 + 12 files changed, 314 insertions(+), 40 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 7cfeb613c40f6..e517a0130ec4f 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -50,7 +50,7 @@ type MPPGather struct { respIter distsql.SelectResult } -func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.MPPTask, isRoot bool) error { +func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash) if err != nil { return errors.Trace(err) @@ -58,12 +58,12 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M for i := range pf.ExchangeSender.Schema().Columns { dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) } - if !isRoot { + if !pf.IsRoot { dagReq.EncodeType = tipb.EncodeType_TypeCHBlock } else { dagReq.EncodeType = tipb.EncodeType_TypeChunk } - for _, mppTask := range tasks { + for _, mppTask := range pf.ExchangeSender.Tasks { err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.TableID, true) if err != nil { return errors.Trace(err) @@ -77,7 +77,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M Data: pbData, Meta: mppTask.Meta, ID: mppTask.ID, - IsRoot: isRoot, + IsRoot: pf.IsRoot, Timeout: 10, SchemaVar: e.is.SchemaMetaVersion(), StartTs: e.startTS, @@ -85,12 +85,6 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M } e.mppReqs = append(e.mppReqs, req) } - for _, r := range pf.ExchangeReceivers { - err = e.appendMPPDispatchReq(r.GetExchangeSender().Fragment, r.Tasks, false) - if err != nil { - return errors.Trace(err) - } - } return nil } @@ -108,13 +102,15 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // TODO: Move the construct tasks logic to planner, so we can see the explain results. sender := e.originalPlan.(*plannercore.PhysicalExchangeSender) planIDs := collectPlanIDS(e.originalPlan, nil) - rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is) + frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is) if err != nil { return errors.Trace(err) } - err = e.appendMPPDispatchReq(sender.Fragment, rootTasks, true) - if err != nil { - return errors.Trace(err) + for _, frag := range frags { + err = e.appendMPPDispatchReq(frag) + if err != nil { + return errors.Trace(err) + } } failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) { if val.(int) != len(e.mppReqs) { diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 690106a9a38e5..e442a219ac1d2 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -425,6 +425,48 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { c.Assert(failpoint.Disable(hang), IsNil) } +func (s *tiflashTestSuite) TestMppUnionAll(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists x1") + tk.MustExec("create table x1(a int , b int);") + tk.MustExec("alter table x1 set tiflash replica 1") + tk.MustExec("drop table if exists x2") + tk.MustExec("create table x2(a int , b int);") + tk.MustExec("alter table x2 set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "x1") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tb = testGetTableByName(c, tk.Se, "test", "x2") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + + tk.MustExec("insert into x1 values (1, 1), (2, 2), (3, 3), (4, 4)") + tk.MustExec("insert into x2 values (5, 1), (2, 2), (3, 3), (4, 4)") + + // test join + union (join + select) + tk.MustQuery("select x1.a, x.a from x1 left join (select x2.b a, x1.b from x1 join x2 on x1.a = x2.b union all select * from x1 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "2 2", "2 2", "3 3", "3 3", "4 4", "4 4")) + tk.MustQuery("select x1.a, x.a from x1 left join (select count(*) a, sum(b) b from x1 group by a union all select * from x2 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "2 2", "3 3", "4 4")) + + tk.MustExec("drop table if exists x3") + tk.MustExec("create table x3(a int , b int);") + tk.MustExec("alter table x3 set tiflash replica 1") + tb = testGetTableByName(c, tk.Se, "test", "x3") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + + tk.MustExec("insert into x3 values (2, 2), (2, 3), (2, 4)") + // test nested union all + tk.MustQuery("select count(*) from (select a, b from x1 union all select a, b from x3 union all (select x1.a, x3.b from (select * from x3 union all select * from x2) x3 left join x1 on x3.a = x1.b))").Check(testkit.Rows("14")) + // test union all join union all + tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29")) + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=100000") + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(6)`) + tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29")) + failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") + +} + func (s *tiflashTestSuite) TestMppApply(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/expression/column.go b/expression/column.go index 006b9a3867cda..ebc0feaf93a06 100644 --- a/expression/column.go +++ b/expression/column.go @@ -38,10 +38,9 @@ type CorrelatedColumn struct { // Clone implements Expression interface. func (col *CorrelatedColumn) Clone() Expression { - var d types.Datum return &CorrelatedColumn{ Column: col.Column, - Data: &d, + Data: col.Data, } } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index dcb4e991ebe33..b2ba91ff99dcd 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2126,7 +2126,7 @@ func (p *baseLogicalPlan) canPushToCop(storeTp kv.StoreType) bool { } } ret = ret && validDs - case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin: + case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin, *LogicalUnionAll: if storeTp == kv.TiFlash { ret = ret && c.canPushToCop(storeTp) } else { @@ -2494,15 +2494,41 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { // TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order. - if !prop.IsEmpty() || prop.IsFlashProp() { + if !prop.IsEmpty() || (prop.IsFlashProp() && prop.TaskTp != property.MppTaskType) { + return nil, true, nil + } + // TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down. + if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType { return nil, true, nil } + canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash) chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { - chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt}) + if canUseMpp && prop.TaskTp == property.MppTaskType { + chReqProps = append(chReqProps, &property.PhysicalProperty{ + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + }) + } else { + chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt}) + } } - ua := PhysicalUnionAll{}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) + ua := PhysicalUnionAll{ + mpp: canUseMpp && prop.TaskTp == property.MppTaskType, + }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) ua.SetSchema(p.Schema()) + if canUseMpp && prop.TaskTp == property.RootTaskType { + chReqProps = make([]*property.PhysicalProperty, 0, len(p.children)) + for range p.children { + chReqProps = append(chReqProps, &property.PhysicalProperty{ + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + }) + } + mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) + mppUA.SetSchema(p.Schema()) + return []PhysicalPlan{ua, mppUA}, true, nil + } return []PhysicalPlan{ua}, true, nil } diff --git a/planner/core/fragment.go b/planner/core/fragment.go index d0ab808c742e7..7315da176e6b8 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -38,32 +38,49 @@ type Fragment struct { // following fields are filled after scheduling. ExchangeSender *PhysicalExchangeSender // data exporter + + IsRoot bool +} + +type tasksAndFrags struct { + tasks []*kv.MPPTask + frags []*Fragment } type mppTaskGenerator struct { ctx sessionctx.Context startTS uint64 is infoschema.InfoSchema + frags []*Fragment + cache map[int]tasksAndFrags } // GenerateRootMPPTasks generate all mpp tasks and return root ones. -func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*kv.MPPTask, error) { - g := &mppTaskGenerator{ctx: ctx, startTS: startTs, is: is} +func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) { + g := &mppTaskGenerator{ + ctx: ctx, + startTS: startTs, + is: is, + cache: make(map[int]tasksAndFrags), + } return g.generateMPPTasks(sender) } -func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*kv.MPPTask, error) { +func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) { logutil.BgLogger().Info("Mpp will generate tasks", zap.String("plan", ToString(s))) tidbTask := &kv.MPPTask{ StartTs: e.startTS, ID: -1, } - rootTasks, err := e.generateMPPTasksForFragment(s) + _, frags, err := e.generateMPPTasksForExchangeSender(s) if err != nil { return nil, errors.Trace(err) } - s.TargetTasks = []*kv.MPPTask{tidbTask} - return rootTasks, nil + for _, frag := range frags { + frag.ExchangeSender.TargetTasks = []*kv.MPPTask{tidbTask} + frag.IsRoot = true + } + return e.frags, nil } type mppAddr struct { @@ -105,6 +122,8 @@ func (f *Fragment) init(p PhysicalPlan) error { f.TableScan = x case *PhysicalExchangeReceiver: f.ExchangeReceivers = append(f.ExchangeReceivers, x) + case *PhysicalUnionAll: + return errors.New("unexpected union all detected") default: for _, ch := range p.Children() { if err := f.init(ch); err != nil { @@ -115,20 +134,107 @@ func (f *Fragment) init(p PhysicalPlan) error { return nil } -func newFragment(s *PhysicalExchangeSender) (*Fragment, error) { - f := &Fragment{ExchangeSender: s} - s.Fragment = f - err := f.init(s) - return f, errors.Trace(err) +// We would remove all the union-all operators by 'untwist'ing and copying the plans above union-all. +// This will make every route from root (ExchangeSender) to leaf nodes (ExchangeReceiver and TableScan) +// a new ioslated tree (and also a fragment) without union all. These trees (fragments then tasks) will +// finally be gathered to TiDB or be exchanged to upper tasks again. +// For instance, given a plan "select c1 from t union all select c1 from s" +// after untwist, there will be two plans in `forest` slice: +// - ExchangeSender -> Projection (c1) -> TableScan(t) +// - ExchangeSender -> Projection (c2) -> TableScan(s) +func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error { + cur := stack[len(stack)-1] + switch x := cur.(type) { + case *PhysicalTableScan, *PhysicalExchangeReceiver: // This should be the leave node. + p, err := stack[0].Clone() + if err != nil { + return errors.Trace(err) + } + *forest = append(*forest, p.(*PhysicalExchangeSender)) + for i := 1; i < len(stack); i++ { + if _, ok := stack[i].(*PhysicalUnionAll); ok { + continue + } + ch, err := stack[i].Clone() + if err != nil { + return errors.Trace(err) + } + if join, ok := p.(*PhysicalHashJoin); ok { + join.SetChild(1-join.InnerChildIdx, ch) + } else { + p.SetChildren(ch) + } + p = ch + } + case *PhysicalHashJoin: + stack = append(stack, x.children[1-x.InnerChildIdx]) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + return errors.Trace(err) + case *PhysicalUnionAll: + for _, ch := range x.children { + stack = append(stack, ch) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + if err != nil { + return errors.Trace(err) + } + } + default: + if len(cur.Children()) != 1 { + return errors.Trace(errors.New("unexpected plan " + cur.ExplainID().String())) + } + ch := cur.Children()[0] + stack = append(stack, ch) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + return errors.Trace(err) + } + return nil } -func (e *mppTaskGenerator) generateMPPTasksForFragment(s *PhysicalExchangeSender) (tasks []*kv.MPPTask, err error) { - f, err := newFragment(s) +func buildFragments(s *PhysicalExchangeSender) ([]*Fragment, error) { + forest := make([]*PhysicalExchangeSender, 0, 1) + err := untwistPlanAndRemoveUnionAll([]PhysicalPlan{s}, &forest) if err != nil { return nil, errors.Trace(err) } + fragments := make([]*Fragment, 0, len(forest)) + for _, s := range forest { + f := &Fragment{ExchangeSender: s} + err = f.init(s) + if err != nil { + return nil, errors.Trace(err) + } + fragments = append(fragments, f) + } + return fragments, nil +} + +func (e *mppTaskGenerator) generateMPPTasksForExchangeSender(s *PhysicalExchangeSender) ([]*kv.MPPTask, []*Fragment, error) { + if cached, ok := e.cache[s.ID()]; ok { + return cached.tasks, cached.frags, nil + } + frags, err := buildFragments(s) + if err != nil { + return nil, nil, errors.Trace(err) + } + results := make([]*kv.MPPTask, 0, len(frags)) + for _, f := range frags { + tasks, err := e.generateMPPTasksForFragment(f) + if err != nil { + return nil, nil, errors.Trace(err) + } + results = append(results, tasks...) + } + e.frags = append(e.frags, frags...) + e.cache[s.ID()] = tasksAndFrags{results, frags} + return results, frags, nil +} + +func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv.MPPTask, err error) { for _, r := range f.ExchangeReceivers { - r.Tasks, err = e.generateMPPTasksForFragment(r.GetExchangeSender()) + r.Tasks, r.frags, err = e.generateMPPTasksForExchangeSender(r.GetExchangeSender()) if err != nil { return nil, errors.Trace(err) } @@ -149,8 +255,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(s *PhysicalExchangeSender return nil, errors.New("cannot find mpp task") } for _, r := range f.ExchangeReceivers { - s := r.GetExchangeSender() - s.TargetTasks = tasks + for _, frag := range r.frags { + frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...) + } } f.ExchangeSender.Tasks = tasks return tasks, nil diff --git a/planner/core/initialize.go b/planner/core/initialize.go index f41340147abfe..cbf099baf4113 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -419,7 +419,7 @@ func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalT if p.tablePlan != nil { p.TablePlans = flattenPushDownPlan(p.tablePlan) p.schema = p.tablePlan.Schema() - if p.StoreType == kv.TiFlash && !p.GetTableScan().KeepOrder { + if p.StoreType == kv.TiFlash && p.GetTableScan() != nil && !p.GetTableScan().KeepOrder { // When allow batch cop is 1, only agg / topN uses batch cop. // When allow batch cop is 2, every query uses batch cop. switch ctx.GetSessionVars().AllowBatchCop { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index e6b2c6b6ef20b..ccb93029edd25 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -1378,6 +1378,10 @@ func (b *PlanBuilder) buildProjection4Union(ctx context.Context, u *LogicalUnion b.optFlag |= flagEliminateProjection proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset()) proj.SetSchema(u.schema.Clone()) + // reset the schema type to make the "not null" flag right. + for i, expr := range exprs { + proj.schema.Columns[i].RetType = expr.GetType() + } proj.SetChildren(child) u.children[childID] = proj } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 5602399a63b74..74c70e1fce3c9 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -111,7 +111,10 @@ func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan { } else if chCnt == 1 { curPlan = curPlan.Children()[0] } else { - join := curPlan.(*PhysicalHashJoin) + join, ok := curPlan.(*PhysicalHashJoin) + if !ok { + return nil + } curPlan = join.children[1-join.globalChildIndex] } } @@ -886,6 +889,18 @@ type PhysicalExchangeReceiver struct { basePhysicalPlan Tasks []*kv.MPPTask + frags []*Fragment +} + +// Clone implment PhysicalPlan interface. +func (p *PhysicalExchangeReceiver) Clone() (PhysicalPlan, error) { + np := new(PhysicalExchangeReceiver) + base, err := p.basePhysicalPlan.cloneWithSelf(np) + if err != nil { + return nil, errors.Trace(err) + } + np.basePhysicalPlan = *base + return np, nil } // GetExchangeSender return the connected sender of this receiver. We assume that its child must be a receiver. @@ -900,10 +915,21 @@ type PhysicalExchangeSender struct { TargetTasks []*kv.MPPTask ExchangeType tipb.ExchangeType HashCols []*expression.Column - // Tasks is the mpp task for current PhysicalExchangeSender + // Tasks is the mpp task for current PhysicalExchangeSender. Tasks []*kv.MPPTask +} - Fragment *Fragment +// Clone implment PhysicalPlan interface. +func (p *PhysicalExchangeSender) Clone() (PhysicalPlan, error) { + np := new(PhysicalExchangeSender) + base, err := p.basePhysicalPlan.cloneWithSelf(np) + if err != nil { + return nil, errors.Trace(err) + } + np.basePhysicalPlan = *base + np.ExchangeType = p.ExchangeType + np.HashCols = p.HashCols + return np, nil } // Clone implements PhysicalPlan interface. @@ -952,6 +978,19 @@ func (p *PhysicalLimit) Clone() (PhysicalPlan, error) { // PhysicalUnionAll is the physical operator of UnionAll. type PhysicalUnionAll struct { physicalSchemaProducer + + mpp bool +} + +// Clone implements PhysicalPlan interface. +func (p *PhysicalUnionAll) Clone() (PhysicalPlan, error) { + cloned := new(PhysicalUnionAll) + base, err := p.physicalSchemaProducer.cloneWithSelf(cloned) + if err != nil { + return nil, err + } + cloned.physicalSchemaProducer = *base + return cloned, nil } // AggMppRunMode defines the running mode of aggregation in MPP diff --git a/planner/core/plan.go b/planner/core/plan.go index dd7e41b77f7fa..2a62afa960b63 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -408,6 +408,9 @@ func (p *basePhysicalPlan) cloneWithSelf(newSelf PhysicalPlan) (*basePhysicalPla base.children = append(base.children, cloned) } for _, prop := range p.childrenReqProps { + if prop == nil { + continue + } base.childrenReqProps = append(base.childrenReqProps, prop.CloneEssentialFields()) } return base, nil diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index 2896a1dade0ff..968917a4fef2e 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -14,6 +14,7 @@ package core import ( + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" @@ -62,10 +63,42 @@ func (pe *projInjector) inject(plan PhysicalPlan) PhysicalPlan { plan = InjectProjBelowSort(p, p.ByItems) case *NominalSort: plan = TurnNominalSortIntoProj(p, p.OnlyColumn, p.ByItems) + case *PhysicalUnionAll: + plan = injectProjBelowUnion(p) } return plan } +func injectProjBelowUnion(un *PhysicalUnionAll) *PhysicalUnionAll { + if !un.mpp { + return un + } + for i, ch := range un.children { + exprs := make([]expression.Expression, len(ch.Schema().Columns)) + needChange := false + for i, dstCol := range un.schema.Columns { + dstType := dstCol.RetType + srcCol := ch.Schema().Columns[i] + srcType := srcCol.RetType + if !srcType.Equal(dstType) || !(mysql.HasNotNullFlag(dstType.Flag) == mysql.HasNotNullFlag(srcType.Flag)) { + exprs[i] = expression.BuildCastFunction4Union(un.ctx, srcCol, dstType) + needChange = true + } else { + exprs[i] = srcCol + } + } + if needChange { + proj := PhysicalProjection{ + Exprs: exprs, + }.Init(un.ctx, ch.statsInfo(), 0) + proj.SetSchema(un.schema.Clone()) + proj.SetChildren(ch) + un.children[i] = proj + } + } + return un +} + // wrapCastForAggFunc wraps the args of an aggregate function with a cast function. // If the mode is FinalMode or Partial2Mode, we do not need to wrap cast upon the args, // since the types of the args are already the expected. diff --git a/planner/core/task.go b/planner/core/task.go index fa6855503dd0e..f4c1d1ca72c65 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1300,7 +1300,31 @@ func (p *PhysicalProjection) attach2Task(tasks ...task) task { return t } +func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { + t := &mppTask{p: p} + childPlans := make([]PhysicalPlan, 0, len(tasks)) + var childMaxCost float64 + for _, tk := range tasks { + if mpp, ok := tk.(*mppTask); ok && !tk.invalid() { + childCost := mpp.cost() + if childCost > childMaxCost { + childMaxCost = childCost + } + childPlans = append(childPlans, mpp.plan()) + } else { + return invalidTask + } + } + p.SetChildren(childPlans...) + t.cst = childMaxCost + p.cost = t.cost() + return t +} + func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { + if _, ok := tasks[0].(*mppTask); ok { + return p.attach2MppTasks(tasks...) + } t := &rootTask{p: p} childPlans := make([]PhysicalPlan, 0, len(tasks)) var childMaxCost float64 diff --git a/store/copr/mpp.go b/store/copr/mpp.go index db0aff7e22696..4be45c3288e23 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -239,6 +239,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req realResp := rpcResp.Resp.(*mpp.DispatchTaskResponse) if realResp.Error != nil { + logutil.BgLogger().Error("mpp dispatch response meet error", zap.String("error", realResp.Error.Msg)) m.sendError(errors.New(realResp.Error.Msg)) return } From fb81a7dfd81a2b2841edbc4ddbc56b47c45f330f Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 2 Jun 2021 14:51:38 +0800 Subject: [PATCH 02/14] planner: warn for incremental analyze in version 3 stats (#24689) --- planner/core/integration_test.go | 25 +++++++++++++++++++++++++ planner/core/planbuilder.go | 5 ++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 1e13d28b96e1c..c691e5341b0a5 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3637,6 +3637,31 @@ func (s *testIntegrationSuite) TestIssue24281(c *C) { "UNION select 1 as v1, 2 as v2") } +func (s *testIntegrationSuite) TestIncrementalAnalyzeStatsVer3(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int, index idx_b(b))") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("set @@session.tidb_analyze_version = 3") + tk.MustExec("analyze table t") + is := tk.Se.GetInfoSchema().(infoschema.InfoSchema) + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + rows := tk.MustQuery(fmt.Sprintf("select distinct_count from mysql.stats_histograms where table_id = %d and is_index = 1", tblID)).Rows() + c.Assert(len(rows), Equals, 1) + c.Assert(rows[0][0], Equals, "3") + tk.MustExec("insert into t values(4,4),(5,5),(6,6)") + tk.MustExec("analyze incremental table t index idx_b") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 2) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings()[0].Err.Error(), Equals, "The version 3 would collect all statistics not only the selected indexes") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings()[1].Err.Error(), Equals, "The version 3 stats would ignore the INCREMENTAL keyword and do full sampling") + rows = tk.MustQuery(fmt.Sprintf("select distinct_count from mysql.stats_histograms where table_id = %d and is_index = 1", tblID)).Rows() + c.Assert(len(rows), Equals, 1) + c.Assert(rows[0][0], Equals, "6") +} + func (s *testIntegrationSuite) TestConflictReadFromStorage(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index f2de374321f25..3990cade06511 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1683,6 +1683,9 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask( } idxInfos = append(idxInfos, idx) } + if as.Incremental { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("The version 3 stats would ignore the INCREMENTAL keyword and do full sampling")) + } for i, id := range physicalIDs { if id == tbl.TableInfo.ID { id = -1 @@ -1692,7 +1695,7 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask( TableName: tbl.Name.O, PartitionName: names[i], TableID: AnalyzeTableID{TableID: tbl.TableInfo.ID, PartitionID: id}, - Incremental: as.Incremental, + Incremental: false, StatsVersion: version, } newTask := AnalyzeColumnsTask{ From b4f3ea6f6b9d9f87a7df5e38876d78c81e792a1b Mon Sep 17 00:00:00 2001 From: wjHuang Date: Wed, 2 Jun 2021 15:07:37 +0800 Subject: [PATCH 03/14] cmd: add tests for CTE (#24907) --- cmd/explaintest/r/cte.result | 149 ++++++++++++++++++++++++++++++ cmd/explaintest/t/cte.test | 170 +++++++++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+) create mode 100644 cmd/explaintest/r/cte.result create mode 100644 cmd/explaintest/t/cte.test diff --git a/cmd/explaintest/r/cte.result b/cmd/explaintest/r/cte.result new file mode 100644 index 0000000000000..af8209d6c9e48 --- /dev/null +++ b/cmd/explaintest/r/cte.result @@ -0,0 +1,149 @@ +use test; +drop table if exists tbl_0; +create table tbl_0(a int); +with recursive cte_0 (col_10,col_11,col_12) AS ( select 1, 2,3 from tbl_0 UNION select col_10 + 1,col_11 + 1,col_12 + 1 from cte_0 where col_10 < 10 ) select * from cte_0; +drop table if exists tbl_1; +CREATE TABLE `tbl_1` ( +`col_5` decimal(47,21) NOT NULL DEFAULT '5308.880000000000000000000', +`col_6` enum('Alice','Bob','Charlie','David') DEFAULT NULL, +`col_7` float NOT NULL, +`col_8` bigint NOT NULL DEFAULT '-688199144806783096', +`col_9` varchar(248) NOT NULL, +PRIMARY KEY (`col_5`,`col_7`,`col_9`,`col_8`), +UNIQUE KEY `idx_4` (`col_8`), +UNIQUE KEY `idx_7` (`col_5`,`col_7`,`col_8`), +UNIQUE KEY `idx_9` (`col_9`,`col_8`), +UNIQUE KEY `idx_3` (`col_9`(3),`col_8`), +UNIQUE KEY `idx_8` (`col_7`,`col_6`,`col_8`,`col_5`), +KEY `idx_5` (`col_7`), +KEY `idx_6` (`col_7`), +KEY `idx_10` (`col_9`,`col_5`), +KEY `idx_11` (`col_5`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +/*!50100 PARTITION BY HASH (`col_8`) +PARTITIONS 4 */; +with recursive cte_1 (col_13,col_14,col_15,col_16,col_17) AS ( with recursive cte_2 (col_18,col_19,col_20,col_21,col_22,col_23,col_24) AS ( select 1, 2,col_8,4,5,6,7 from tbl_1 ) select col_19,col_18,col_22,col_23,col_21 from cte_2 UNION ALL select col_13 + 1,col_14 + 1,col_15 + 1,col_16 + 1,col_17 + 1 from cte_1 where col_13 < 10 ) select * from cte_1; +with recursive cte_256 (col_969,col_970,col_971) AS ( with recursive cte_257 (col_972,col_973,col_974,col_975) AS ( select 1, 2,col_8,4 from tbl_1 UNION select col_972 + 1,col_973 + 1,col_974 + 1,col_975 + 1 from cte_257 where col_972 < 10 ) select col_975,col_974,col_973 from cte_257 UNION DISTINCT select col_969 + 1,col_970 + 1,col_971 + 1 from cte_256 where col_969 < 10 ) select * from cte_256; +drop table if exists tbl_2, tbl_3; +create table tbl_2 ( col_4 char(246) collate utf8_unicode_ci not null , col_5 char(253) collate utf8mb4_unicode_ci ) ; +create table tbl_3 ( col_6 char(207) collate utf8mb4_unicode_ci , col_7 int unsigned not null ) ; +insert into tbl_2 values ( "0",null ) ; +insert into tbl_2 values ( "1","0" ) ; +insert into tbl_2 values ( "1","1" ) ; +insert into tbl_2 values ( "0","0" ) ; +insert into tbl_2 values ( "0","1" ) ; +insert into tbl_3 values ( "1",0 ) ; +insert into tbl_3 values ( "1",1 ) ; +insert into tbl_3 values ( "0",0 ) ; +insert into tbl_3 values ( "0",1 ) ; +with recursive tbl_2 (col_64,col_65,col_66,col_67) AS ( select 1, col_6,col_6,4 from tbl_3 UNION DISTINCT select col_64 + 1,concat(col_65, 1),col_66 + 1,concat(col_67, 1) from tbl_2 where col_64 < 5 ) select * from tbl_2 order by col_64; +drop table if exists tbl_3, tbl_4; +create table tbl_3 ( col_6 int not null , col_7 char(95) collate utf8_general_ci ) ; +create table tbl_4 ( col_8 char collate utf8_unicode_ci , col_9 char collate utf8mb4_bin ) ; +insert into tbl_3 values ( 0,"1" ) ; +insert into tbl_4 values ( "1","0" ) ; +with recursive cte_2245 (col_8692,col_8693) AS ( select 1, col_7 from tbl_3 UNION select col_8692 + 1,concat(col_8693, 1) from cte_2245 where col_8692 < 5 ) , cte_2246 (col_8694,col_8695,col_8696,col_8697) AS ( with recursive cte_2247 (col_8698,col_8699,col_8700,col_8701) AS ( select 1, cast("2" as char(20)),3,col_8 from tbl_4 ) select col_8698,col_8699,col_8700,col_8701 from cte_2247 UNION select col_8694 + 1,col_8695 + 1,col_8696 + 1,col_8697 + 1 from cte_2246 where col_8694 < 5 ) select * from cte_2245,cte_2246 order by col_8692,col_8693,col_8696,col_8695,col_8697,col_8694; +with recursive cte2 as (select 1 as col_1, 2 as col_2) select c1.col_1, c2.col_2 from cte2 as c1, cte2 as c2 where c2.col_2 = 1; +with recursive cte (c1) as (select 1), cte1 (c2) as (select 1 union select c1 + 1 from cte, cte1) select * from cte, cte1; +with recursive tbl_0 (col_943,col_944,col_945,col_946,col_947) AS ( with recursive tbl_0 (col_948,col_949,col_950,col_951,col_952) AS ( select 1, 2,3,4,5 UNION ALL select col_948 + 1,col_949 + 1,col_950 + 1,col_951 + 1,col_952 + 1 from tbl_0 where col_948 < 5 ) select col_948,col_949,col_951,col_950,col_952 from tbl_0 UNION ALL select col_943 + 1,col_944 + 1,col_945 + 1,col_946 + 1,col_947 + 1 from tbl_0 where col_943 < 5 ) select * from tbl_0; +Error 1054: Unknown column 'col_943' in 'where clause' +with recursive cte1 (c1, c2) as (select 1, '1' union select concat(c1, 1), c2 + 1 from cte1 where c1 < 100) select * from cte1; +with recursive cte_8 (col_51,col_52,col_53,col_54) AS ( with recursive cte_9 (col_55,col_56,col_57,col_58) AS ( select 1, 2,3,4 UNION ALL select col_55 + 1,col_56 + 1,col_57 + 1,col_58 + 1 from cte_9 where col_55 < 5 ) select col_55,col_57,col_56,col_58 from cte_9 UNION DISTINCT select col_51 + 1,col_52 + 1,col_53 + 1,col_54 + 1 from cte_8 where col_51 < 5 ) select * from cte_8; +with recursive qn as (select 1 from dual union all select 1 from dual) select * from qn; +with recursive qn as (select 1 as a from dual group by a union all select a+1 from qn where a<3) select * from qn; +with recursive qn as ((select 1 as a from dual order by a) union all select a+1 from qn where a<3) select * from qn; +drop table if exists employees; +CREATE TABLE employees ( +ID INT PRIMARY KEY, +NAME VARCHAR(100), +MANAGER_ID INT, +INDEX (MANAGER_ID), +FOREIGN KEY (MANAGER_ID) REFERENCES employees(ID) +); +INSERT INTO employees VALUES +(333, "Yasmina", NULL), +(198, "John", 333), +(692, "Tarek", 333), +(29, "Pedro", 198), +(4610, "Sarah", 29), +(72, "Pierre", 29), +(123, "Adil", 692); +WITH RECURSIVE employees_extended AS (SELECT ID, NAME, MANAGER_ID, CAST(ID AS CHAR(200)) AS PATH FROM employees WHERE NAME='Pierre' UNION ALL SELECT S.ID, S.NAME, S.MANAGER_ID, CONCAT(M.PATH, ",", S.ID) FROM employees_extended M JOIN employees S ON M.MANAGER_ID=S.ID) SELECT * FROM employees_extended; +with recursive cte (c1) as (select 1), cte1 (c2) as (select 1 union select c1 + 1 from cte where c1 < 10) select * from cte where c1 < 5; +with recursive cte_581 (col_2343,col_2344,col_2345) AS ( select 1, '2',cast('3' as char(20))) , cte_582 (col_2346,col_2347,col_2348) AS ( select 1, 2, 3) select * from cte_581 as cte_as_583,cte_582 as cte_as_584,cte_582 as cte_as_585 order by cte_as_583.col_2343,cte_as_585.col_2348,cte_as_584.col_2346,cte_as_584.col_2348,cte_as_583.col_2344,cte_as_584.col_2347,cte_as_585.col_2346,cte_as_585.col_2347,cte_as_583.col_2345; +with recursive tbl_3 (col_19,col_20,col_21,col_22) AS ( select 1, 2,3,4 UNION select col_19 + 1,col_20 + 1,col_21 + 1,concat(col_22, 1) from tbl_3 where col_19 < 5 ) , cte_4 (col_23,col_24,col_25,col_26) AS ( select 1, 2,cast("3" as char(20)),4 UNION DISTINCT select col_23 + 1,col_24 + 1,concat(col_25, 1),col_26 + 1 from cte_4 where col_23 < 5 ) select * from tbl_3 as cte_as_3,cte_4 as cte_as_4,tbl_3 as cte_as_5 order by cte_as_3.col_19,cte_as_4.col_23,cte_as_4.col_25,cte_as_4.col_24,cte_as_4.col_26,cte_as_3.col_20,cte_as_5.col_22,cte_as_3.col_21,cte_as_5.col_20,cte_as_3.col_22,cte_as_5.col_19,cte_as_5.col_21; +with cte1 (c1) as (select 1) select * from cte1 as b, cte1 as a; +WITH RECURSIVE qn AS +( +select 1 +union all +select 3, 0 from qn +) +select * from qn; +Error 1222: The used SELECT statements have a different number of columns +with recursive cte1 as (select 1 union all (select 1 from cte1 limit 10)) select * from cte1; +Error 1235: This version of TiDB doesn't yet support 'ORDER BY / LIMIT / SELECT DISTINCT in recursive query block of Common Table Expression' +with recursive qn as (select 123 as a union all select null from qn where a is not null) select * from qn; +with recursive q (b) as (select 1, 1 union all select 1, 1 from q) select b from q; +Error 1353: In definition of view, derived table or common table expression, SELECT list and column names list have different column counts +drop table if exists t1; +create table t1(a int); +insert into t1 values(1); +insert into t1 values(2); +SELECT * +FROM +t1 dt +WHERE +EXISTS( +WITH RECURSIVE qn AS (SELECT a*0 AS b UNION ALL SELECT b+1 FROM qn WHERE b=0) +SELECT * FROM qn WHERE b=a +); +a +1 +drop table if exists t1; +create table t1 (a int); +insert into t1 values (1); +SELECT (WITH qn AS (SELECT 10*a as a FROM t1), +qn2 AS (SELECT 3*a AS b FROM qn) SELECT * from qn2 LIMIT 1) +FROM t1; +(WITH qn AS (SELECT 10*a as a FROM t1), +qn2 AS (SELECT 3*a AS b FROM qn) +30 +select (with qn as (select "with") select * from qn) as scal_subq +from dual; +scal_subq +with +drop table if exists t1; +create table t1 (a int); + insert into t1 values(1), (2), (3); +with q as (select * from t1) +select /*+ merge(q) no_merge(q1) */ * from q, q q1 where q.a=1 and q1.a=2; +drop table if exists t1; + create table t1 (a int, b int); +with qn as (select a, b from t1) select b from qn group by a; +drop table if exists t1; +create table t1(a int); +insert into t1 values(1); +insert into t1 values(2); +SELECT * +FROM +t1 dt +WHERE +EXISTS( +WITH RECURSIVE qn AS (SELECT a*0+1 AS b UNION ALL SELECT b+1 FROM qn WHERE b=0) +SELECT * FROM qn WHERE b=1 +); +a +1 +2 +drop table if exists tbl_1; +CREATE TABLE `tbl_1` ( +`col_2` char(65) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, +`col_3` int(11) NOT NULL +); +with recursive cte_8932 (col_34891,col_34892) AS ( with recursive cte_8932 (col_34893,col_34894,col_34895) AS ( with tbl_1 (col_34896,col_34897,col_34898,col_34899) AS ( select 1, "2",3,col_3 from tbl_1 ) select cte_as_8958.col_34896,cte_as_8958.col_34898,cte_as_8958.col_34899 from tbl_1 as cte_as_8958 UNION DISTINCT select col_34893 + 1,concat(col_34894, 1),col_34895 + 1 from cte_8932 where col_34893 < 5 ) select cte_as_8959.col_34893,cte_as_8959.col_34895 from cte_8932 as cte_as_8959 ) select * from cte_8932 as cte_as_8960 order by cte_as_8960.col_34891,cte_as_8960.col_34892; +drop table if exists t1; +create table t1(c1 bigint unsigned); +insert into t1 values(0); +with recursive cte1 as (select c1 - 1 c1 from t1 union all select c1 - 1 c1 from cte1 where c1 != 0) select * from cte1 dt1, cte1 dt2; +Error 1690: BIGINT UNSIGNED value is out of range in '(test.t1.c1 - 1)' diff --git a/cmd/explaintest/t/cte.test b/cmd/explaintest/t/cte.test new file mode 100644 index 0000000000000..b45695e641135 --- /dev/null +++ b/cmd/explaintest/t/cte.test @@ -0,0 +1,170 @@ +use test; +# case 1 +drop table if exists tbl_0; +create table tbl_0(a int); +with recursive cte_0 (col_10,col_11,col_12) AS ( select 1, 2,3 from tbl_0 UNION select col_10 + 1,col_11 + 1,col_12 + 1 from cte_0 where col_10 < 10 ) select * from cte_0; +# case 2 +drop table if exists tbl_1; +CREATE TABLE `tbl_1` ( + `col_5` decimal(47,21) NOT NULL DEFAULT '5308.880000000000000000000', + `col_6` enum('Alice','Bob','Charlie','David') DEFAULT NULL, + `col_7` float NOT NULL, + `col_8` bigint NOT NULL DEFAULT '-688199144806783096', + `col_9` varchar(248) NOT NULL, + PRIMARY KEY (`col_5`,`col_7`,`col_9`,`col_8`), + UNIQUE KEY `idx_4` (`col_8`), + UNIQUE KEY `idx_7` (`col_5`,`col_7`,`col_8`), + UNIQUE KEY `idx_9` (`col_9`,`col_8`), + UNIQUE KEY `idx_3` (`col_9`(3),`col_8`), + UNIQUE KEY `idx_8` (`col_7`,`col_6`,`col_8`,`col_5`), + KEY `idx_5` (`col_7`), + KEY `idx_6` (`col_7`), + KEY `idx_10` (`col_9`,`col_5`), + KEY `idx_11` (`col_5`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +/*!50100 PARTITION BY HASH (`col_8`) +PARTITIONS 4 */; +with recursive cte_1 (col_13,col_14,col_15,col_16,col_17) AS ( with recursive cte_2 (col_18,col_19,col_20,col_21,col_22,col_23,col_24) AS ( select 1, 2,col_8,4,5,6,7 from tbl_1 ) select col_19,col_18,col_22,col_23,col_21 from cte_2 UNION ALL select col_13 + 1,col_14 + 1,col_15 + 1,col_16 + 1,col_17 + 1 from cte_1 where col_13 < 10 ) select * from cte_1; +# case 3 +with recursive cte_256 (col_969,col_970,col_971) AS ( with recursive cte_257 (col_972,col_973,col_974,col_975) AS ( select 1, 2,col_8,4 from tbl_1 UNION select col_972 + 1,col_973 + 1,col_974 + 1,col_975 + 1 from cte_257 where col_972 < 10 ) select col_975,col_974,col_973 from cte_257 UNION DISTINCT select col_969 + 1,col_970 + 1,col_971 + 1 from cte_256 where col_969 < 10 ) select * from cte_256; +# case 4 +drop table if exists tbl_2, tbl_3; +create table tbl_2 ( col_4 char(246) collate utf8_unicode_ci not null , col_5 char(253) collate utf8mb4_unicode_ci ) ; +create table tbl_3 ( col_6 char(207) collate utf8mb4_unicode_ci , col_7 int unsigned not null ) ; +insert into tbl_2 values ( "0",null ) ; +insert into tbl_2 values ( "1","0" ) ; +insert into tbl_2 values ( "1","1" ) ; +insert into tbl_2 values ( "0","0" ) ; +insert into tbl_2 values ( "0","1" ) ; +insert into tbl_3 values ( "1",0 ) ; +insert into tbl_3 values ( "1",1 ) ; +insert into tbl_3 values ( "0",0 ) ; +insert into tbl_3 values ( "0",1 ) ; +with recursive tbl_2 (col_64,col_65,col_66,col_67) AS ( select 1, col_6,col_6,4 from tbl_3 UNION DISTINCT select col_64 + 1,concat(col_65, 1),col_66 + 1,concat(col_67, 1) from tbl_2 where col_64 < 5 ) select * from tbl_2 order by col_64; +# case 5 +drop table if exists tbl_3, tbl_4; +create table tbl_3 ( col_6 int not null , col_7 char(95) collate utf8_general_ci ) ; +create table tbl_4 ( col_8 char collate utf8_unicode_ci , col_9 char collate utf8mb4_bin ) ; +insert into tbl_3 values ( 0,"1" ) ; +insert into tbl_4 values ( "1","0" ) ; +with recursive cte_2245 (col_8692,col_8693) AS ( select 1, col_7 from tbl_3 UNION select col_8692 + 1,concat(col_8693, 1) from cte_2245 where col_8692 < 5 ) , cte_2246 (col_8694,col_8695,col_8696,col_8697) AS ( with recursive cte_2247 (col_8698,col_8699,col_8700,col_8701) AS ( select 1, cast("2" as char(20)),3,col_8 from tbl_4 ) select col_8698,col_8699,col_8700,col_8701 from cte_2247 UNION select col_8694 + 1,col_8695 + 1,col_8696 + 1,col_8697 + 1 from cte_2246 where col_8694 < 5 ) select * from cte_2245,cte_2246 order by col_8692,col_8693,col_8696,col_8695,col_8697,col_8694; +# case 6 +with recursive cte2 as (select 1 as col_1, 2 as col_2) select c1.col_1, c2.col_2 from cte2 as c1, cte2 as c2 where c2.col_2 = 1; +# case 7 +with recursive cte (c1) as (select 1), cte1 (c2) as (select 1 union select c1 + 1 from cte, cte1) select * from cte, cte1; +# case 8 +--error 1054 +with recursive tbl_0 (col_943,col_944,col_945,col_946,col_947) AS ( with recursive tbl_0 (col_948,col_949,col_950,col_951,col_952) AS ( select 1, 2,3,4,5 UNION ALL select col_948 + 1,col_949 + 1,col_950 + 1,col_951 + 1,col_952 + 1 from tbl_0 where col_948 < 5 ) select col_948,col_949,col_951,col_950,col_952 from tbl_0 UNION ALL select col_943 + 1,col_944 + 1,col_945 + 1,col_946 + 1,col_947 + 1 from tbl_0 where col_943 < 5 ) select * from tbl_0; +# case 9 +with recursive cte1 (c1, c2) as (select 1, '1' union select concat(c1, 1), c2 + 1 from cte1 where c1 < 100) select * from cte1; +# case 10 +with recursive cte_8 (col_51,col_52,col_53,col_54) AS ( with recursive cte_9 (col_55,col_56,col_57,col_58) AS ( select 1, 2,3,4 UNION ALL select col_55 + 1,col_56 + 1,col_57 + 1,col_58 + 1 from cte_9 where col_55 < 5 ) select col_55,col_57,col_56,col_58 from cte_9 UNION DISTINCT select col_51 + 1,col_52 + 1,col_53 + 1,col_54 + 1 from cte_8 where col_51 < 5 ) select * from cte_8; +# case 11 +with recursive qn as (select 1 from dual union all select 1 from dual) select * from qn; +# case 12 +with recursive qn as (select 1 as a from dual group by a union all select a+1 from qn where a<3) select * from qn; +# case 13 +with recursive qn as ((select 1 as a from dual order by a) union all select a+1 from qn where a<3) select * from qn; +# case 14 +drop table if exists employees; +CREATE TABLE employees ( +ID INT PRIMARY KEY, +NAME VARCHAR(100), +MANAGER_ID INT, +INDEX (MANAGER_ID), +FOREIGN KEY (MANAGER_ID) REFERENCES employees(ID) +); +INSERT INTO employees VALUES +(333, "Yasmina", NULL), +(198, "John", 333), +(692, "Tarek", 333), +(29, "Pedro", 198), +(4610, "Sarah", 29), +(72, "Pierre", 29), +(123, "Adil", 692); +WITH RECURSIVE employees_extended AS (SELECT ID, NAME, MANAGER_ID, CAST(ID AS CHAR(200)) AS PATH FROM employees WHERE NAME='Pierre' UNION ALL SELECT S.ID, S.NAME, S.MANAGER_ID, CONCAT(M.PATH, ",", S.ID) FROM employees_extended M JOIN employees S ON M.MANAGER_ID=S.ID) SELECT * FROM employees_extended; +# case 15 +with recursive cte (c1) as (select 1), cte1 (c2) as (select 1 union select c1 + 1 from cte where c1 < 10) select * from cte where c1 < 5; +# case 16 +with recursive cte_581 (col_2343,col_2344,col_2345) AS ( select 1, '2',cast('3' as char(20))) , cte_582 (col_2346,col_2347,col_2348) AS ( select 1, 2, 3) select * from cte_581 as cte_as_583,cte_582 as cte_as_584,cte_582 as cte_as_585 order by cte_as_583.col_2343,cte_as_585.col_2348,cte_as_584.col_2346,cte_as_584.col_2348,cte_as_583.col_2344,cte_as_584.col_2347,cte_as_585.col_2346,cte_as_585.col_2347,cte_as_583.col_2345; +# case 17 +with recursive tbl_3 (col_19,col_20,col_21,col_22) AS ( select 1, 2,3,4 UNION select col_19 + 1,col_20 + 1,col_21 + 1,concat(col_22, 1) from tbl_3 where col_19 < 5 ) , cte_4 (col_23,col_24,col_25,col_26) AS ( select 1, 2,cast("3" as char(20)),4 UNION DISTINCT select col_23 + 1,col_24 + 1,concat(col_25, 1),col_26 + 1 from cte_4 where col_23 < 5 ) select * from tbl_3 as cte_as_3,cte_4 as cte_as_4,tbl_3 as cte_as_5 order by cte_as_3.col_19,cte_as_4.col_23,cte_as_4.col_25,cte_as_4.col_24,cte_as_4.col_26,cte_as_3.col_20,cte_as_5.col_22,cte_as_3.col_21,cte_as_5.col_20,cte_as_3.col_22,cte_as_5.col_19,cte_as_5.col_21; +# case 18 +with cte1 (c1) as (select 1) select * from cte1 as b, cte1 as a; +# case 19 +--error 1222 +WITH RECURSIVE qn AS +( +select 1 +union all +select 3, 0 from qn +) +select * from qn; +# case 20 +--error 1235 +with recursive cte1 as (select 1 union all (select 1 from cte1 limit 10)) select * from cte1; +# case 21 +# TODO: uncomment this case after we support limit +# with recursive cte1 as (select 1 union all select 1 from cte1 limit 10) select * from cte1; +# case 22 +with recursive qn as (select 123 as a union all select null from qn where a is not null) select * from qn; +# case 23 +--error 1353 +with recursive q (b) as (select 1, 1 union all select 1, 1 from q) select b from q; +# case 24 +drop table if exists t1; +create table t1(a int); +insert into t1 values(1); +insert into t1 values(2); +SELECT * +FROM +t1 dt +WHERE +EXISTS( + WITH RECURSIVE qn AS (SELECT a*0 AS b UNION ALL SELECT b+1 FROM qn WHERE b=0) + SELECT * FROM qn WHERE b=a + ); +# case 25 +drop table if exists t1; +create table t1 (a int); +insert into t1 values (1); +SELECT (WITH qn AS (SELECT 10*a as a FROM t1), + qn2 AS (SELECT 3*a AS b FROM qn) SELECT * from qn2 LIMIT 1) +FROM t1; +# case 26 +select (with qn as (select "with") select * from qn) as scal_subq +from dual; +# case 27 +drop table if exists t1; +create table t1 (a int); insert into t1 values(1), (2), (3); +with q as (select * from t1) +select /*+ merge(q) no_merge(q1) */ * from q, q q1 where q.a=1 and q1.a=2; +# case 28 +drop table if exists t1; create table t1 (a int, b int); +with qn as (select a, b from t1) select b from qn group by a; +# case 29 +drop table if exists t1; +create table t1(a int); +insert into t1 values(1); +insert into t1 values(2); +SELECT * +FROM +t1 dt +WHERE +EXISTS( + WITH RECURSIVE qn AS (SELECT a*0+1 AS b UNION ALL SELECT b+1 FROM qn WHERE b=0) + SELECT * FROM qn WHERE b=1 + ); +# case 30 +drop table if exists tbl_1; +CREATE TABLE `tbl_1` ( + `col_2` char(65) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, + `col_3` int(11) NOT NULL +); +with recursive cte_8932 (col_34891,col_34892) AS ( with recursive cte_8932 (col_34893,col_34894,col_34895) AS ( with tbl_1 (col_34896,col_34897,col_34898,col_34899) AS ( select 1, "2",3,col_3 from tbl_1 ) select cte_as_8958.col_34896,cte_as_8958.col_34898,cte_as_8958.col_34899 from tbl_1 as cte_as_8958 UNION DISTINCT select col_34893 + 1,concat(col_34894, 1),col_34895 + 1 from cte_8932 where col_34893 < 5 ) select cte_as_8959.col_34893,cte_as_8959.col_34895 from cte_8932 as cte_as_8959 ) select * from cte_8932 as cte_as_8960 order by cte_as_8960.col_34891,cte_as_8960.col_34892; +# case 31 +drop table if exists t1; +create table t1(c1 bigint unsigned); +insert into t1 values(0); +--error 1690 +with recursive cte1 as (select c1 - 1 c1 from t1 union all select c1 - 1 c1 from cte1 where c1 != 0) select * from cte1 dt1, cte1 dt2; From e5b0389d8754fd453f64863cf00551b5a4faf3dd Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Wed, 2 Jun 2021 15:39:38 +0800 Subject: [PATCH 04/14] test: fix a name typo error in test file (#25054) --- ddl/column_type_change_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 7febe2209f62c..38e2cc83ebc5a 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -1884,7 +1884,7 @@ func (s *testColumnTypeChangeSuite) TestChangeIntToBitWillPanicInBackfillIndexes tk.MustQuery("select * from t").Check(testkit.Rows("\x13 1 1.00", "\x11 2 2.00")) } -// Close issue #24971, #24973, #24971 +// Close issue #24971, #24973, #24974 func (s *testColumnTypeChangeSuite) TestCTCShouldCastTheDefaultValue(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") From b7e448aa109c4ebf819ad0ca9fd6d3864d401ad5 Mon Sep 17 00:00:00 2001 From: Howie Date: Wed, 2 Jun 2021 15:56:26 +0800 Subject: [PATCH 05/14] ddl: add auto random && shard_row_id_bits compatibility for temporary table (#24940) --- ddl/db_test.go | 13 +++++++++++++ ddl/ddl_api.go | 3 +++ ddl/serial_test.go | 11 +++++++++++ planner/core/planbuilder.go | 12 ++++++++---- planner/core/preprocess.go | 16 ++++++++++++++-- 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 00cf3fc42b6fc..7ef7d23a82196 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -5360,6 +5361,18 @@ func (s *testSerialDBSuite) TestAlterShardRowIDBits(c *C) { c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine") } +func (s *testSerialDBSuite) TestShardRowIDBitsOnTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists shard_row_id_temporary") + _, err := tk.Exec("create global temporary table shard_row_id_temporary (a int) shard_row_id_bits = 5 on commit delete rows;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits").Error()) + tk.MustExec("create global temporary table shard_row_id_temporary (a int) on commit delete rows;") + defer tk.MustExec("drop table if exists shard_row_id_temporary") + _, err = tk.Exec("alter table shard_row_id_temporary shard_row_id_bits = 4;") + c.Assert(err.Error(), Equals, ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits").Error()) +} + // port from mysql // https://github.com/mysql/mysql-server/blob/124c7ab1d6f914637521fd4463a993aa73403513/mysql-test/t/lock.test func (s *testDBSuite2) TestLock(c *C) { diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index e5871311d78a9..4684d5d4cc7ef 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2647,6 +2647,9 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint if err != nil { return errors.Trace(err) } + if t.Meta().TempTableType != model.TempTableNone { + return ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits") + } if uVal == t.Meta().ShardRowIDBits { // Nothing need to do. return nil diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 8d9217e1df8e3..62610e3279cf6 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -942,6 +942,17 @@ func (s *testSerialSuite) TestTableLocksEnable(c *C) { checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone) } +func (s *testSerialDBSuite) TestAutoRandomOnTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists auto_random_temporary") + _, err := tk.Exec("create global temporary table auto_random_temporary (a bigint primary key auto_random(3), b varchar(255)) on commit delete rows;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("auto_random").Error()) + tk.MustExec("set @@tidb_enable_noop_functions = 1") + _, err = tk.Exec("create temporary table t(a bigint key auto_random);") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("auto_random").Error()) +} + func (s *testSerialDBSuite) TestAutoRandom(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("create database if not exists auto_random_db") diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 3990cade06511..4df8572b19447 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3399,7 +3399,11 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err authErr = ErrTableaccessDenied.GenWithStackByArgs("ALTER", b.ctx.GetSessionVars().User.AuthUsername, b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) } - b.visitInfo = appendVisitInfo(b.visitInfo, mysql.AlterPriv, v.Table.Schema.L, + dbName := v.Table.Schema.L + if dbName == "" { + dbName = b.ctx.GetSessionVars().CurrentDB + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.AlterPriv, dbName, v.Table.Name.L, "", authErr) for _, spec := range v.Specs { if spec.Tp == ast.AlterTableRenameTable || spec.Tp == ast.AlterTableExchangePartition { @@ -3407,21 +3411,21 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.AuthUsername, b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) } - b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.Table.Schema.L, + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, dbName, v.Table.Name.L, "", authErr) if b.ctx.GetSessionVars().User != nil { authErr = ErrTableaccessDenied.GenWithStackByArgs("CREATE", b.ctx.GetSessionVars().User.AuthUsername, b.ctx.GetSessionVars().User.AuthHostname, spec.NewTable.Name.L) } - b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, spec.NewTable.Schema.L, + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, dbName, spec.NewTable.Name.L, "", authErr) if b.ctx.GetSessionVars().User != nil { authErr = ErrTableaccessDenied.GenWithStackByArgs("INSERT", b.ctx.GetSessionVars().User.AuthUsername, b.ctx.GetSessionVars().User.AuthHostname, spec.NewTable.Name.L) } - b.visitInfo = appendVisitInfo(b.visitInfo, mysql.InsertPriv, spec.NewTable.Schema.L, + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.InsertPriv, dbName, spec.NewTable.Name.L, "", authErr) } else if spec.Tp == ast.AlterTableDropPartition { if b.ctx.GetSessionVars().User != nil { diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 947b1fdcef1ec..07d7a485ab04e 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -640,6 +640,14 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { return } } + if stmt.TemporaryKeyword != ast.TemporaryNone { + for _, opt := range stmt.Options { + if opt.Tp == ast.TableOptionShardRowID { + p.err = ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits") + return + } + } + } tName := stmt.Table.Name.String() if isIncorrectName(tName) { p.err = ddl.ErrWrongTableName.GenWithStackByArgs(tName) @@ -656,7 +664,7 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { p.err = err return } - isPrimary, err := checkColumnOptions(colDef.Options) + isPrimary, err := checkColumnOptions(stmt.TemporaryKeyword != ast.TemporaryNone, colDef.Options) if err != nil { p.err = err return @@ -813,7 +821,7 @@ func isTableAliasDuplicate(node ast.ResultSetNode, tableAliases map[string]inter return nil } -func checkColumnOptions(ops []*ast.ColumnOption) (int, error) { +func checkColumnOptions(isTempTable bool, ops []*ast.ColumnOption) (int, error) { isPrimary, isGenerated, isStored := 0, 0, false for _, op := range ops { @@ -823,6 +831,10 @@ func checkColumnOptions(ops []*ast.ColumnOption) (int, error) { case ast.ColumnOptionGenerated: isGenerated = 1 isStored = op.Stored + case ast.ColumnOptionAutoRandom: + if isTempTable { + return isPrimary, ErrOptOnTemporaryTable.GenWithStackByArgs("auto_random") + } } } From ca3d88eba5c67c17ed0f56faa10dcef8af6236d3 Mon Sep 17 00:00:00 2001 From: Shirly Date: Wed, 2 Jun 2021 16:08:26 +0800 Subject: [PATCH 06/14] store/tikv: make pub vars related to region to private and add functions to set & get (#25044) --- store/copr/batch_request_sender.go | 3 +-- store/tikv/region_cache.go | 31 ++++++++++++++++++++++-------- store/tikv/region_request.go | 13 ++++++++++++- tidb-server/main.go | 6 +++--- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index dea0c98148aa0..b12ced3f65c12 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -15,7 +15,6 @@ package copr import ( "context" - "sync/atomic" "time" "github.com/pingcap/errors" @@ -80,7 +79,7 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 { + } else if tikv.LoadShuttingDown() > 0 { return tikverr.ErrTiDBShuttingDown } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 2fb006138dff5..50eb437545024 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -55,8 +55,13 @@ const ( defaultRegionsPerBatch = 128 ) -// RegionCacheTTLSec is the max idle time for regions in the region cache. -var RegionCacheTTLSec int64 = 600 +// regionCacheTTLSec is the max idle time for regions in the region cache. +var regionCacheTTLSec int64 = 600 + +// SetRegionCacheTTLSec sets regionCacheTTLSec to t. +func SetRegionCacheTTLSec(t int64) { + regionCacheTTLSec = t +} const ( updated int32 = iota // region is updated and no need to reload. @@ -254,7 +259,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { }) for { lastAccess := atomic.LoadInt64(&r.lastAccess) - if ts-lastAccess > RegionCacheTTLSec { + if ts-lastAccess > regionCacheTTLSec { return false } if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { @@ -1180,7 +1185,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { return nil } lastAccess := atomic.LoadInt64(&latestRegion.lastAccess) - if ts-lastAccess > RegionCacheTTLSec { + if ts-lastAccess > regionCacheTTLSec { return nil } if latestRegion != nil { @@ -2070,10 +2075,20 @@ type livenessState uint32 var ( livenessSf singleflight.Group - // StoreLivenessTimeout is the max duration of resolving liveness of a TiKV instance. - StoreLivenessTimeout time.Duration + // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. + storeLivenessTimeout time.Duration ) +// SetStoreLivenessTimeout sets storeLivenessTimeout to t. +func SetStoreLivenessTimeout(t time.Duration) { + storeLivenessTimeout = t +} + +// GetStoreLivenessTimeout returns storeLivenessTimeout. +func GetStoreLivenessTimeout() time.Duration { + return storeLivenessTimeout +} + const ( unknown livenessState = iota reachable @@ -2136,7 +2151,7 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) return c.testingKnobs.mockRequestLiveness(s, bo) } - if StoreLivenessTimeout == 0 { + if storeLivenessTimeout == 0 { return unreachable } @@ -2146,7 +2161,7 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) } addr := s.addr rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { - return invokeKVStatusAPI(addr, StoreLivenessTimeout), nil + return invokeKVStatusAPI(addr, storeLivenessTimeout), nil }) var ctx context.Context if bo != nil { diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index b8fcc837c1bc6..a90d638233578 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -44,8 +44,19 @@ import ( // ShuttingDown is a flag to indicate tidb-server is exiting (Ctrl+C signal // receved for example). If this flag is set, tikv client should not retry on // network error because tidb-server expect tikv client to exit as soon as possible. +// TODO: make it private when br is ready. var ShuttingDown uint32 +// StoreShuttingDown atomically stores ShuttingDown into v. +func StoreShuttingDown(v uint32) { + atomic.StoreUint32(&ShuttingDown, v) +} + +// LoadShuttingDown atomically loads ShuttingDown. +func LoadShuttingDown() uint32 { + return atomic.LoadUint32(&ShuttingDown) +} + // RegionRequestSender sends KV/Cop requests to tikv server. It handles network // errors and some region errors internally. // @@ -555,7 +566,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&ShuttingDown) > 0 { + } else if LoadShuttingDown() > 0 { return tikverr.ErrTiDBShuttingDown } if status.Code(errors.Cause(err)) == codes.Canceled { diff --git a/tidb-server/main.go b/tidb-server/main.go index f1b16397046f9..f99722e8bfd13 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -562,7 +562,7 @@ func setGlobalVars() { } atomic.StoreUint64(&tikv.CommitMaxBackoff, uint64(parseDuration(cfg.TiKVClient.CommitTimeout).Seconds()*1000)) - tikv.RegionCacheTTLSec = int64(cfg.TiKVClient.RegionCacheTTL) + tikv.SetRegionCacheTTLSec(int64(cfg.TiKVClient.RegionCacheTTL)) domainutil.RepairInfo.SetRepairMode(cfg.RepairMode) domainutil.RepairInfo.SetRepairTableList(cfg.RepairTableList) executor.GlobalDiskUsageTracker.SetBytesLimit(cfg.TempStorageQuota) @@ -579,7 +579,7 @@ func setGlobalVars() { logutil.BgLogger().Fatal("invalid duration value for store-liveness-timeout", zap.String("currentValue", cfg.TiKVClient.StoreLivenessTimeout)) } - tikv.StoreLivenessTimeout = t + tikv.SetStoreLivenessTimeout(t) parsertypes.TiDBStrictIntegerDisplayWidth = cfg.DeprecateIntegerDisplayWidth } @@ -649,7 +649,7 @@ func setupTracing() { } func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { - atomic.StoreUint32(&tikv.ShuttingDown, 1) + tikv.StoreShuttingDown(1) dom.Close() err := storage.Close() terror.Log(errors.Trace(err)) From f966a3d9fb41a893674b4ca59121b35d54300b35 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 2 Jun 2021 17:30:43 +0800 Subject: [PATCH 07/14] executor: fix the wrong KVRange for partition tables in TableReader (#25047) --- executor/builder.go | 6 +- executor/partition_table_test.go | 149 +++++++++++++++++++++++++++++++ executor/table_reader.go | 4 +- 3 files changed, 153 insertions(+), 6 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 3ef6d96358e0b..66f20c000a92b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2762,7 +2762,6 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{ sctx: b.ctx, partitions: partitions, - ranges: ts.Ranges, } return ret @@ -3447,15 +3446,14 @@ func dedupHandles(lookUpContents []*indexJoinLookUpContent) ([]kv.Handle, []*ind type kvRangeBuilderFromRangeAndPartition struct { sctx sessionctx.Context partitions []table.PhysicalTable - ranges []*ranger.Range } -func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(int64) ([]kv.KeyRange, error) { +func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(_ int64, ranges []*ranger.Range) ([]kv.KeyRange, error) { var ret []kv.KeyRange for _, p := range h.partitions { pid := p.GetPhysicalID() meta := p.Meta() - kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, h.ranges, nil) + kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil) if err != nil { return nil, err } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index d8ae8abcd9476..34066a301bff4 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -2110,6 +2110,155 @@ func (s *partitionTableSuite) TestDirectReadingWithUnionScan(c *C) { tk.MustExec(`rollback`) } +func (s *partitionTableSuite) TestIssue25030(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_issue_25030") + tk.MustExec("use test_issue_25030") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + tk.MustExec(`CREATE TABLE tbl_936 ( + col_5410 smallint NOT NULL, + col_5411 double, + col_5412 boolean NOT NULL DEFAULT 1, + col_5413 set('Alice', 'Bob', 'Charlie', 'David') NOT NULL DEFAULT 'Charlie', + col_5414 varbinary(147) COLLATE 'binary' DEFAULT 'bvpKgYWLfyuTiOYSkj', + col_5415 timestamp NOT NULL DEFAULT '2021-07-06', + col_5416 decimal(6, 6) DEFAULT 0.49, + col_5417 text COLLATE utf8_bin, + col_5418 float DEFAULT 2048.0762299371554, + col_5419 int UNSIGNED NOT NULL DEFAULT 3152326370, + PRIMARY KEY (col_5419) ) + PARTITION BY HASH (col_5419) PARTITIONS 3`) + tk.MustQuery(`SELECT last_value(col_5414) OVER w FROM tbl_936 + WINDOW w AS (ORDER BY col_5410, col_5411, col_5412, col_5413, col_5414, col_5415, col_5416, col_5417, col_5418, col_5419) + ORDER BY col_5410, col_5411, col_5412, col_5413, col_5414, col_5415, col_5416, col_5417, col_5418, col_5419, nth_value(col_5412, 5) OVER w`). + Check(testkit.Rows()) // can work properly without any error or panic +} + +func (s *partitionTableSuite) TestUnsignedPartitionColumn(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_unsigned_partition") + tk.MustExec("use test_unsigned_partition") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + tk.MustExec(`create table thash_pk (a int unsigned, b int, primary key(a)) partition by hash (a) partitions 3`) + tk.MustExec(`create table trange_pk (a int unsigned, b int, primary key(a)) partition by range (a) ( + partition p1 values less than (100000), + partition p2 values less than (200000), + partition p3 values less than (300000), + partition p4 values less than (400000))`) + tk.MustExec(`create table tnormal_pk (a int unsigned, b int, primary key(a))`) + tk.MustExec(`create table thash_uniq (a int unsigned, b int, unique key(a)) partition by hash (a) partitions 3`) + tk.MustExec(`create table trange_uniq (a int unsigned, b int, unique key(a)) partition by range (a) ( + partition p1 values less than (100000), + partition p2 values less than (200000), + partition p3 values less than (300000), + partition p4 values less than (400000))`) + tk.MustExec(`create table tnormal_uniq (a int unsigned, b int, unique key(a))`) + + valColA := make(map[int]struct{}, 1000) + vals := make([]string, 0, 1000) + for len(vals) < 1000 { + a := rand.Intn(400000) + if _, ok := valColA[a]; ok { + continue + } + valColA[a] = struct{}{} + vals = append(vals, fmt.Sprintf("(%v, %v)", a, rand.Intn(400000))) + } + valStr := strings.Join(vals, ", ") + for _, tbl := range []string{"thash_pk", "trange_pk", "tnormal_pk", "thash_uniq", "trange_uniq", "tnormal_uniq"} { + tk.MustExec(fmt.Sprintf("insert into %v values %v", tbl, valStr)) + } + + for i := 0; i < 100; i++ { + scanCond := fmt.Sprintf("a %v %v", []string{">", "<"}[rand.Intn(2)], rand.Intn(400000)) + pointCond := fmt.Sprintf("a = %v", rand.Intn(400000)) + batchCond := fmt.Sprintf("a in (%v, %v, %v)", rand.Intn(400000), rand.Intn(400000), rand.Intn(400000)) + + var rScan, rPoint, rBatch [][]interface{} + for tid, tbl := range []string{"tnormal_pk", "trange_pk", "thash_pk"} { + // unsigned + TableReader + scanSQL := fmt.Sprintf("select * from %v use index(primary) where %v", tbl, scanCond) + c.Assert(tk.HasPlan(scanSQL, "TableReader"), IsTrue) + r := tk.MustQuery(scanSQL).Sort() + if tid == 0 { + rScan = r.Rows() + } else { + r.Check(rScan) + } + + // unsigned + PointGet on PK + pointSQL := fmt.Sprintf("select * from %v use index(primary) where %v", tbl, pointCond) + tk.MustPointGet(pointSQL) + r = tk.MustQuery(pointSQL).Sort() + if tid == 0 { + rPoint = r.Rows() + } else { + r.Check(rPoint) + } + + // unsigned + BatchGet on PK + batchSQL := fmt.Sprintf("select * from %v where %v", tbl, batchCond) + c.Assert(tk.HasPlan(batchSQL, "Batch_Point_Get"), IsTrue) + r = tk.MustQuery(batchSQL).Sort() + if tid == 0 { + rBatch = r.Rows() + } else { + r.Check(rBatch) + } + } + + lookupCond := fmt.Sprintf("a %v %v", []string{">", "<"}[rand.Intn(2)], rand.Intn(400000)) + var rLookup [][]interface{} + for tid, tbl := range []string{"tnormal_uniq", "trange_uniq", "thash_uniq"} { + // unsigned + IndexReader + scanSQL := fmt.Sprintf("select a from %v use index(a) where %v", tbl, scanCond) + c.Assert(tk.HasPlan(scanSQL, "IndexReader"), IsTrue) + r := tk.MustQuery(scanSQL).Sort() + if tid == 0 { + rScan = r.Rows() + } else { + r.Check(rScan) + } + + // unsigned + IndexLookUp + lookupSQL := fmt.Sprintf("select * from %v use index(a) where %v", tbl, lookupCond) + tk.MustIndexLookup(lookupSQL) + r = tk.MustQuery(lookupSQL).Sort() + if tid == 0 { + rLookup = r.Rows() + } else { + r.Check(rLookup) + } + + // unsigned + PointGet on UniqueIndex + pointSQL := fmt.Sprintf("select * from %v use index(a) where %v", tbl, pointCond) + tk.MustPointGet(pointSQL) + r = tk.MustQuery(pointSQL).Sort() + if tid == 0 { + rPoint = r.Rows() + } else { + r.Check(rPoint) + } + + // unsigned + BatchGet on UniqueIndex + batchSQL := fmt.Sprintf("select * from %v where %v", tbl, batchCond) + c.Assert(tk.HasPlan(batchSQL, "Batch_Point_Get"), IsTrue) + r = tk.MustQuery(batchSQL).Sort() + if tid == 0 { + rBatch = r.Rows() + } else { + r.Check(rBatch) + } + } + } +} + func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test") diff --git a/executor/table_reader.go b/executor/table_reader.go index 29d9e4e6908c5..a4459ee920291 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -53,7 +53,7 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con } type kvRangeBuilder interface { - buildKeyRange(pid int64) ([]kv.KeyRange, error) + buildKeyRange(pid int64, ranges []*ranger.Range) ([]kv.KeyRange, error) } // TableReaderExecutor sends DAG request and reads table data from kv layer. @@ -212,7 +212,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra var builder distsql.RequestBuilder var reqBuilder *distsql.RequestBuilder if e.kvRangeBuilder != nil { - kvRange, err := e.kvRangeBuilder.buildKeyRange(getPhysicalTableID(e.table)) + kvRange, err := e.kvRangeBuilder.buildKeyRange(getPhysicalTableID(e.table), ranges) if err != nil { return nil, err } From 1f79bfe63778d557d25a1f7258f6d5ff7fda58d6 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 2 Jun 2021 17:58:54 +0800 Subject: [PATCH 08/14] *: fix ci lint (#25057) --- executor/executor.go | 2 +- session/session.go | 2 +- util/execdetails/execdetails.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index c444c0920069e..bb5ba3b89fcac 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1668,7 +1668,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { pprof.SetGoroutineLabels(goCtx) } if variable.TopSQLEnabled() && prepareStmt.SQLDigest != nil { - goCtx = topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil) + topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil) } } // execute missed stmtID uses empty sql diff --git a/session/session.go b/session/session.go index c4b9c77587790..0c10d2edbcc2d 100644 --- a/session/session.go +++ b/session/session.go @@ -1391,7 +1391,7 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter normalized, digest := parser.NormalizeDigest(sql) if digest != nil { // Fixme: reset/clean the label when internal sql execute finish. - ctx = topsql.AttachSQLInfo(ctx, normalized, digest, "", nil) + topsql.AttachSQLInfo(ctx, normalized, digest, "", nil) } } return stmts[0], nil diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 97f9e9611513d..5f19b30158284 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -735,7 +735,6 @@ func (e *RuntimeStatsWithConcurrencyInfo) String() string { // Merge implements the RuntimeStats interface. func (e *RuntimeStatsWithConcurrencyInfo) Merge(_ RuntimeStats) { - return } // RuntimeStatsWithCommit is the RuntimeStats with commit detail. From ad7102cdeedfcf5f014003f2c1579fc13d82e467 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 2 Jun 2021 18:20:26 +0800 Subject: [PATCH 09/14] planner: generate correct number of rows when all agg funcs are pruned (#24937) --- cmd/explaintest/r/explain_easy.result | 21 +++--- planner/core/integration_test.go | 23 +++++++ planner/core/rule_column_pruning.go | 21 ++++-- .../core/testdata/integration_suite_in.json | 15 +++++ .../core/testdata/integration_suite_out.json | 65 +++++++++++++++++++ 5 files changed, 131 insertions(+), 14 deletions(-) diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index 927e25c8024f2..214e51b366de1 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -194,31 +194,32 @@ test t4 1 expr_idx 1 NULL NULL (`a` + `b` + 1) 2 YES NO explain format = 'brief' select count(1) from (select count(1) from (select * from t1 where c3 = 100) k) k2; id estRows task access object operator info StreamAgg 1.00 root funcs:count(1)->Column#5 -└─StreamAgg 1.00 root funcs:firstrow(Column#9)->Column#7 +└─StreamAgg 1.00 root funcs:count(Column#9)->Column#7 └─TableReader 1.00 root data:StreamAgg - └─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#9 + └─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#9 └─Selection 10.00 cop[tikv] eq(test.t1.c3, 100) └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo explain format = 'brief' select 1 from (select count(c2), count(c3) from t1) k; id estRows task access object operator info Projection 1.00 root 1->Column#6 -└─StreamAgg 1.00 root funcs:firstrow(Column#14)->Column#9 +└─StreamAgg 1.00 root funcs:count(Column#14)->Column#9 └─TableReader 1.00 root data:StreamAgg - └─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#14 + └─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#14 └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo explain format = 'brief' select count(1) from (select max(c2), count(c3) as m from t1) k; id estRows task access object operator info StreamAgg 1.00 root funcs:count(1)->Column#6 -└─StreamAgg 1.00 root funcs:firstrow(Column#13)->Column#8 +└─StreamAgg 1.00 root funcs:count(Column#13)->Column#8 └─TableReader 1.00 root data:StreamAgg - └─StreamAgg 1.00 cop[tikv] funcs:firstrow(1)->Column#13 + └─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#13 └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo explain format = 'brief' select count(1) from (select count(c2) from t1 group by c3) k; id estRows task access object operator info StreamAgg 1.00 root funcs:count(1)->Column#5 -└─HashAgg 8000.00 root group by:test.t1.c3, funcs:firstrow(1)->Column#7 - └─TableReader 10000.00 root data:TableFullScan - └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo +└─HashAgg 8000.00 root group by:test.t1.c3, funcs:count(Column#9)->Column#7 + └─TableReader 8000.00 root data:HashAgg + └─HashAgg 8000.00 cop[tikv] group by:test.t1.c3, funcs:count(1)->Column#9 + └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo set @@session.tidb_opt_insubq_to_join_and_agg=0; explain format = 'brief' select sum(t1.c1 in (select c1 from t2)) from t1; id estRows task access object operator info @@ -498,7 +499,7 @@ PRIMARY KEY (`id`) explain format = 'brief' SELECT COUNT(1) FROM (SELECT COALESCE(b.region_name, '不详') region_name, SUM(a.registration_num) registration_num FROM (SELECT stat_date, show_date, region_id, 0 registration_num FROM test01 WHERE period = 1 AND stat_date >= 20191202 AND stat_date <= 20191202 UNION ALL SELECT stat_date, show_date, region_id, registration_num registration_num FROM test01 WHERE period = 1 AND stat_date >= 20191202 AND stat_date <= 20191202) a LEFT JOIN test02 b ON a.region_id = b.id WHERE registration_num > 0 AND a.stat_date >= '20191202' AND a.stat_date <= '20191202' GROUP BY a.stat_date , a.show_date , COALESCE(b.region_name, '不详') ) JLS; id estRows task access object operator info StreamAgg 1.00 root funcs:count(1)->Column#22 -└─HashAgg 8000.00 root group by:Column#32, Column#33, Column#34, funcs:firstrow(1)->Column#31 +└─HashAgg 8000.00 root group by:Column#32, Column#33, Column#34, funcs:count(1)->Column#31 └─Projection 10000.01 root Column#14, Column#15, coalesce(test.test02.region_name, 不详)->Column#34 └─HashJoin 10000.01 root left outer join, equal:[eq(Column#16, test.test02.id)] ├─TableReader(Build) 10000.00 root data:TableFullScan diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index c691e5341b0a5..bbfe52a0ee710 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -180,6 +180,29 @@ func (s *testIntegrationSuite) TestPushLimitDownIndexLookUpReader(c *C) { } } +func (s *testIntegrationSuite) TestAggColumnPrune(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1),(2)") + + var input []string + var output []struct { + SQL string + Res []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...)) + } +} + func (s *testIntegrationSuite) TestIsFromUnixtimeNullRejective(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 4b31853c138d0..8a627792ecc7f 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -88,7 +88,11 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) child := la.children[0] used := expression.GetUsedList(parentUsedCols, la.Schema()) + allFirstRow := true for i := len(used) - 1; i >= 0; i-- { + if la.AggFuncs[i].Name != ast.AggFuncFirstRow { + allFirstRow = false + } if !used[i] { la.schema.Columns = append(la.schema.Columns[:i], la.schema.Columns[i+1:]...) la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...) @@ -103,15 +107,24 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) selfUsedCols = append(selfUsedCols, cols...) } if len(la.AggFuncs) == 0 { - // If all the aggregate functions are pruned, we should add an aggregate function to keep the correctness. - one, err := aggregation.NewAggFuncDesc(la.ctx, ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false) + // If all the aggregate functions are pruned, we should add an aggregate function to maintain the info of row numbers. + // For all the aggregate functions except `first_row`, if we have an empty table defined as t(a,b), + // `select agg(a) from t` would always return one row, while `select agg(a) from t group by b` would return empty. + // For `first_row` which is only used internally by tidb, `first_row(a)` would always return empty for empty input now. + var err error + var newAgg *aggregation.AggFuncDesc + if allFirstRow { + newAgg, err = aggregation.NewAggFuncDesc(la.ctx, ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false) + } else { + newAgg, err = aggregation.NewAggFuncDesc(la.ctx, ast.AggFuncCount, []expression.Expression{expression.NewOne()}, false) + } if err != nil { return err } - la.AggFuncs = []*aggregation.AggFuncDesc{one} + la.AggFuncs = []*aggregation.AggFuncDesc{newAgg} col := &expression.Column{ UniqueID: la.ctx.GetSessionVars().AllocPlanColumnID(), - RetType: one.RetTp, + RetType: newAgg.RetTp, } la.schema.Columns = []*expression.Column{col} } diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 087b32110e18f..63b866ad3badd 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -19,6 +19,21 @@ "explain format = 'brief' select * from t t1 left join t t2 on t1.a=t2.a where from_unixtime(t2.b);" ] }, + { + "name": "TestAggColumnPrune", + "cases": [ + "select count(1) from t join (select count(1) from t where false) as tmp", + "select count(1) from t join (select max(a) from t where false) as tmp", + "select count(1) from t join (select min(a) from t where false) as tmp", + "select count(1) from t join (select sum(a) from t where false) as tmp", + "select count(1) from t join (select avg(a) from t where false) as tmp", + "select count(1) from t join (select count(1) from t where false group by a) as tmp", + "select count(1) from t join (select max(a) from t where false group by a) as tmp", + "select count(1) from t join (select min(a) from t where false group by a) as tmp", + "select count(1) from t join (select sum(a) from t where false group by a) as tmp", + "select count(1) from t join (select avg(a) from t where false group by a) as tmp" + ] + }, { "name": "TestIndexJoinInnerIndexNDV", "cases": [ diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 7c735fcb5657c..77aa5b1494da7 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -63,6 +63,71 @@ } ] }, + { + "Name": "TestAggColumnPrune", + "Cases": [ + { + "SQL": "select count(1) from t join (select count(1) from t where false) as tmp", + "Res": [ + "2" + ] + }, + { + "SQL": "select count(1) from t join (select max(a) from t where false) as tmp", + "Res": [ + "2" + ] + }, + { + "SQL": "select count(1) from t join (select min(a) from t where false) as tmp", + "Res": [ + "2" + ] + }, + { + "SQL": "select count(1) from t join (select sum(a) from t where false) as tmp", + "Res": [ + "2" + ] + }, + { + "SQL": "select count(1) from t join (select avg(a) from t where false) as tmp", + "Res": [ + "2" + ] + }, + { + "SQL": "select count(1) from t join (select count(1) from t where false group by a) as tmp", + "Res": [ + "0" + ] + }, + { + "SQL": "select count(1) from t join (select max(a) from t where false group by a) as tmp", + "Res": [ + "0" + ] + }, + { + "SQL": "select count(1) from t join (select min(a) from t where false group by a) as tmp", + "Res": [ + "0" + ] + }, + { + "SQL": "select count(1) from t join (select sum(a) from t where false group by a) as tmp", + "Res": [ + "0" + ] + }, + { + "SQL": "select count(1) from t join (select avg(a) from t where false group by a) as tmp", + "Res": [ + "0" + ] + } + ] + }, { "Name": "TestIndexJoinInnerIndexNDV", "Cases": [ From 8656b5d39668fb1511bc2cdaecf7f6ccb757c9bd Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 2 Jun 2021 18:32:26 +0800 Subject: [PATCH 10/14] executor: avoid distsql request for TableReader/IndexReader/IndexLookup on temporary table (#24769) --- executor/distsql.go | 29 +++++++++++++++++++++ executor/executor_test.go | 51 ++++++++++++++++++++++++++++++++++++ executor/table_reader.go | 55 +++++++++++++++++++++++++++++++-------- 3 files changed, 124 insertions(+), 11 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 23fa8bd4ce58e..a4d71f45a6a9e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -196,6 +196,10 @@ type IndexReaderExecutor struct { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() error { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + err := e.result.Close() e.result = nil e.ctx.StoreQueryFeedback(e.feedback) @@ -204,6 +208,11 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + req.Reset() + return nil + } + err := e.result.Next(ctx, req) if err != nil { e.feedback.Invalidate() @@ -266,6 +275,11 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } e.kvRanges = kvRanges + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + // In a test case IndexReaderExecutor is mocked and e.table is nil. + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -381,6 +395,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + if e.table.Meta().TempTableType == model.TempTableGlobal { + return nil + } + err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -639,6 +659,10 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + if !e.workerStarted || e.finished == nil { return nil } @@ -659,6 +683,11 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta().TempTableType == model.TempTableGlobal { + req.Reset() + return nil + } + if !e.workerStarted { if err := e.startWorkers(ctx, req.RequiredRows()); err != nil { return err diff --git a/executor/executor_test.go b/executor/executor_test.go index c41b6f2a7773e..8d70d7fa16468 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8356,6 +8356,57 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) } +func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { + // Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV. + tk := testkit.NewTestKit(c, s.store) + tk1 := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk1.MustExec("use test") + tk.MustExec("create table normal (id int, a int, index(a))") + tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") + + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) + }() + + // Make sure the fail point works. + // With that failpoint, all requests to the TiKV is discard. + rs, err := tk1.Exec("select * from normal") + c.Assert(err, IsNil) + blocked := make(chan struct{}) + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + _, err := session.ResultSetToStringSlice(ctx, tk1.Se, rs) + blocked <- struct{}{} + c.Assert(err, NotNil) + }() + select { + case <-blocked: + c.Error("The query should block when the failpoint is enabled") + case <-time.After(200 * time.Millisecond): + } + cancelFunc() + + // Check the temporary table do not send request to TiKV. + // Table reader + tk.HasPlan("select * from tmp_t", "TableReader") + tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2")) + // Index reader + tk.HasPlan("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t", "IndexReader") + tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2")) + // Index lookup + tk.HasPlan("select id from tmp_t where a = 1", "IndexLookUp") + tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1")) + + tk.MustExec("rollback") +} + func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/table_reader.go b/executor/table_reader.go index a4459ee920291..fc5e5a2c11096 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -153,6 +153,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { } } firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle) + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + // Calculate the kv ranges here, UnionScan rely on this kv ranges. + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + kvReq, err := e.buildKVReq(ctx, firstPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + if len(secondPartRanges) != 0 { + kvReq, err = e.buildKVReq(ctx, secondPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } + return nil + } + firstResult, err := e.buildResp(ctx, firstPartRanges) if err != nil { e.feedback.Invalidate() @@ -175,6 +194,12 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + req.Reset() + return nil + } + logutil.Eventf(ctx, "table scan table: %s, range: %v", stringutil.MemoizeStr(func() string { var tableName string if meta := e.table.Meta(); meta != nil { @@ -197,6 +222,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + var err error if e.resultHandler != nil { err = e.resultHandler.Close() @@ -209,6 +238,20 @@ func (e *TableReaderExecutor) Close() error { // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResult returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { + kvReq, err := e.buildKVReq(ctx, ranges) + if err != nil { + return nil, err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + return result, nil +} + +func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { var builder distsql.RequestBuilder var reqBuilder *distsql.RequestBuilder if e.kvRangeBuilder != nil { @@ -231,17 +274,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetAllowBatchCop(e.batchCop) - kvReq, err := reqBuilder.Build() - if err != nil { - return nil, err - } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) - - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) - if err != nil { - return nil, err - } - return result, nil + return reqBuilder.Build() } func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnInfo) []int { From debf8c76a37ab659594f81337b72d4f177c69814 Mon Sep 17 00:00:00 2001 From: Raywill Date: Wed, 2 Jun 2021 19:26:25 +0800 Subject: [PATCH 11/14] docs: fix typo (#25048) --- docs/design/2020-06-24-placement-rules-in-sql.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/design/2020-06-24-placement-rules-in-sql.md b/docs/design/2020-06-24-placement-rules-in-sql.md index a07c7d5b5e171..d5eb56f690e1e 100644 --- a/docs/design/2020-06-24-placement-rules-in-sql.md +++ b/docs/design/2020-06-24-placement-rules-in-sql.md @@ -14,7 +14,7 @@ The scenarios of defining placement rules in SQL include: - Place data across regions to improve access locality - Add a TiFlash replica for a table -- Limit data within its national border to gaurantee data sovereignty +- Limit data within its national border to guarantee data sovereignty - Place latest data to SSD and history data to HDD - Place the leader of hot data to a high-performance TiKV instance - Increase the replica count of more important data @@ -233,7 +233,7 @@ For example, `CONSTRAINTS="{+zone=sh:1,-zone=bj:2}"` indicates to place 1 replic In the list format, `count` is not specified. The number of replicas for each constraint is not limited, but the total number of replicas should still conform to the `REPLICAS` option. -For example, `CONSTRAINTS="[+zone=sh,+zone=bj]" REPLICAS=3` indicates to place 3 repicas on either `sh` or `bj`. There may be 2 replicas on `sh` and 1 in `bj`, or 2 in `bj` and 1 in `sh`. It's up to PD. +For example, `CONSTRAINTS="[+zone=sh,+zone=bj]" REPLICAS=3` indicates to place 3 replicas on either `sh` or `bj`. There may be 2 replicas on `sh` and 1 in `bj`, or 2 in `bj` and 1 in `sh`. It's up to PD. Label constraints can be implemented by defining `label_constraints` field in PD placement rule configuration. `+` and `-` correspond to property `op`. Specifically, `+` is equivalent to `in` and `-` is equivalent to `notIn`. @@ -553,7 +553,7 @@ However, TiDB also uses placement rules in some cases, as discussed in section " Before choosing the solution, transactional requirements need to be noticed: -- Defining placement rules may fail, and users will probably retry it. As retrying `ADD PLACEMENT POLICY` will add more replicas than expected, the atomacity of the opertion needs to be gauranteed. +- Defining placement rules may fail, and users will probably retry it. As retrying `ADD PLACEMENT POLICY` will add more replicas than expected, the atomicity of the opertion needs to be guaranteed. - `ADD PLACEMENT POLICY` needs to read the original placement rules, combine the 2 rules and then store them to PD, so linearizability should be gauranteed. If the placement rules are stored on both TiKV and PD, the approaches to keep atomicity are as follows: @@ -583,7 +583,7 @@ The comparison shows that both solutions are possible, but storing placement rul The scenarios where TiDB queries placement rules are as follows: 1. The optimizer uses placement rules to decide to route cop request to TiKV or TiFlash. It's already implemented and the TiFlash information is written into table information, which is stored on TiKV. -2. It will be probably used in locality-aware features in the furture, such as follower-read. Follower-read is always used when TiDB wants to read the nearest replica to reduce multi-region latency. In some distributed databases, it’s implemented by labelling data nodes and selecting the nearest replica according to the labels. +2. It will be probably used in locality-aware features in the future, such as follower-read. Follower-read is always used when TiDB wants to read the nearest replica to reduce multi-region latency. In some distributed databases, it’s implemented by labelling data nodes and selecting the nearest replica according to the labels. 3. Local transactions need to know the binding relationship between Raft leader and region, which is also defined by placement rules. 4. Once a rule is defined on a table, all the subsequent partitions added to the table should also inherit the rule. So the `ADD PARTITION` operation should query the rules on the table. The same is true for creating tables and indices. 5. `SHOW PLACEMENT POLICY` statement should output the placement rules correctly. @@ -615,7 +615,7 @@ The fact that the DDL procedure in TiDB is mature helps to achieve some features - Placement rules are defined in serial as there's only one DDL owner at the same time - DDL is capable of disaster recovery as the middle states are persistent in TiKV - DDL is rollbackable as the middle states can transform from one to another -- Updating schema version guarantees all active transactions are based on the same version of placement ruels +- Updating schema version guarantees all active transactions are based on the same version of placement rules ### Rule priorities @@ -712,7 +712,7 @@ ALTER TABLE t ALTER PLACEMENT POLICY CONSTRAINTS="{+zone=bj:2,+zone=sh:1}" ROLE=voter; ``` -It needs 2 placement rules for `voter` in the PD placment rule configuration, because each rule can only specify one `count`. To make `id` unique, a unique identifier must be appended to `id`. DDL job ID plus an index in the job is a good choice. +It needs 2 placement rules for `voter` in the PD placement rule configuration, because each rule can only specify one `count`. To make `id` unique, a unique identifier must be appended to `id`. DDL job ID plus an index in the job is a good choice. Take the case above for example, assuming the table ID of `t` is 100, the ID of the DDL job executing this statement is 200, then `id` of the placement rules are `100-200-1` and `100-200-2`. From da8dbe6085b4e60ddafe4836aa1f325b5cc44f13 Mon Sep 17 00:00:00 2001 From: Howie Date: Wed, 2 Jun 2021 19:34:25 +0800 Subject: [PATCH 12/14] ddl: add foreign key compatibility for temporary table (#24961) --- ddl/db_test.go | 20 +++++++++++++++++++- ddl/ddl_api.go | 3 +++ ddl/serial_test.go | 7 ------- planner/core/planbuilder.go | 8 ++++++-- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 7ef7d23a82196..0bfc654941727 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -3012,7 +3012,7 @@ func (s *testDBSuite2) TestTableForeignKey(c *C) { tk.MustExec("create table t3 (a int, b int);") failSQL = "alter table t1 add foreign key (c) REFERENCES t3(a);" tk.MustGetErrCode(failSQL, errno.ErrKeyColumnDoesNotExits) - // test oreign key not match error + // test origin key not match error failSQL = "alter table t1 add foreign key (a) REFERENCES t3(a, b);" tk.MustGetErrCode(failSQL, errno.ErrWrongFkDef) // Test drop column with foreign key. @@ -3031,6 +3031,24 @@ func (s *testDBSuite2) TestTableForeignKey(c *C) { tk.MustExec("drop table if exists t1,t2,t3,t4;") } +func (s *testDBSuite2) TestTemporaryTableForeignKey(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1 (a int, b int);") + tk.MustExec("drop table if exists t1_tmp;") + tk.MustExec("create global temporary table t1_tmp (a int, b int) on commit delete rows;") + // test add foreign key. + tk.MustExec("drop table if exists t2;") + tk.MustExec("create table t2 (a int, b int);") + failSQL := "alter table t1_tmp add foreign key (c) REFERENCES t2(a);" + tk.MustGetErrCode(failSQL, mysql.ErrCannotAddForeign) + // Test drop column with foreign key. + failSQL = "create global temporary table t3 (c int,d int,foreign key (d) references t1 (b)) on commit delete rows;" + tk.MustGetErrCode(failSQL, mysql.ErrCannotAddForeign) + tk.MustExec("drop table if exists t1,t2,t3,t1_tmp;") +} + func (s *testDBSuite8) TestFKOnGeneratedColumns(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 4684d5d4cc7ef..640823b23305a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -5267,6 +5267,9 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode if err != nil { return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)) } + if t.Meta().TempTableType != model.TempTableNone { + return infoschema.ErrCannotAddForeign + } fkInfo, err := buildFKInfo(fkName, keys, refer, t.Cols(), t.Meta()) if err != nil { diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 62610e3279cf6..30c81bf2f2e7b 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -534,13 +534,6 @@ func (s *testSerialSuite) TestCreateTableWithLike(c *C) { _, err = tk.Exec("create table temporary_table_t1 like temporary_table") c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error()) tk.MustExec("drop table if exists temporary_table;") - - tk.MustExec("drop table if exists temporary_table_like;") - tk.MustExec("create table temporary_table (a int, b int,index(a))") - tk.MustExec("drop table if exists temporary_table_like_t1;") - _, err = tk.Exec("create global temporary table temporary_table_like_t1 like temporary_table on commit delete rows;") - c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error()) - tk.MustExec("drop table if exists temporary_table_like;") } // TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 4df8572b19447..b743204a1db91 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3481,8 +3481,12 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err b.visitInfo = appendVisitInfo(b.visitInfo, mysql.IndexPriv, v.Table.Schema.L, v.Table.Name.L, "", authErr) case *ast.CreateTableStmt: - if v.TemporaryKeyword != ast.TemporaryNone && v.ReferTable != nil { - return nil, ErrOptOnTemporaryTable.GenWithStackByArgs("create table like") + if v.TemporaryKeyword != ast.TemporaryNone { + for _, cons := range v.Constraints { + if cons.Tp == ast.ConstraintForeignKey { + return nil, infoschema.ErrCannotAddForeign + } + } } if b.ctx.GetSessionVars().User != nil { authErr = ErrTableaccessDenied.GenWithStackByArgs("CREATE", b.ctx.GetSessionVars().User.AuthUsername, From 32cf14bd7dafdc23409fd0ee289c3742063f6478 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Wed, 2 Jun 2021 22:16:25 +0800 Subject: [PATCH 13/14] statistics: relax the check of the OutOfRange (#24958) --- statistics/cmsketch.go | 8 +++ statistics/histogram.go | 92 ++++++++++++------------ statistics/selectivity_test.go | 8 +-- statistics/statistics_test.go | 2 +- statistics/table.go | 21 ++++-- statistics/testdata/stats_suite_in.json | 6 +- statistics/testdata/stats_suite_out.json | 46 ++++++++---- 7 files changed, 113 insertions(+), 70 deletions(-) diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index c510186b16c40..07d90434a6cc7 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -530,6 +530,14 @@ func (c *TopN) Num() int { return len(c.TopN) } +// outOfRange checks whether the the given value falls back in [TopN.LowestOne, TopN.HighestOne]. +func (c *TopN) outOfRange(val []byte) bool { + if c == nil || len(c.TopN) == 0 { + return true + } + return bytes.Compare(c.TopN[0].Encoded, val) > 0 || bytes.Compare(val, c.TopN[c.Num()-1].Encoded) > 0 +} + // DecodedString returns the value with decoded result. func (c *TopN) DecodedString(ctx sessionctx.Context, colTypes []byte) (string, error) { builder := &strings.Builder{} diff --git a/statistics/histogram.go b/statistics/histogram.go index 027950b8326c7..f377b14c0c067 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -506,20 +506,12 @@ func (hg *Histogram) BetweenRowCount(a, b types.Datum) float64 { } // BetweenRowCount estimates the row count for interval [l, r). -func (c *Column) BetweenRowCount(sc *stmtctx.StatementContext, l, r types.Datum) (float64, error) { +func (c *Column) BetweenRowCount(sc *stmtctx.StatementContext, l, r types.Datum, lowEncoded, highEncoded []byte) float64 { histBetweenCnt := c.Histogram.BetweenRowCount(l, r) if c.StatsVer <= Version1 { - return histBetweenCnt, nil - } - lBytes, err := codec.EncodeKey(sc, nil, l) - if err != nil { - return 0, errors.Trace(err) - } - rBytes, err := codec.EncodeKey(sc, nil, r) - if err != nil { - return 0, errors.Trace(err) + return histBetweenCnt } - return float64(c.TopN.BetweenCount(lBytes, rBytes)) + histBetweenCnt, nil + return float64(c.TopN.BetweenCount(lowEncoded, highEncoded)) + histBetweenCnt } // TotalRowCount returns the total count of this histogram. @@ -978,7 +970,7 @@ func (c *Column) IsInvalid(sc *stmtctx.StatementContext, collPseudo bool) bool { return c.TotalRowCount() == 0 || (c.Histogram.NDV > 0 && c.notNullCount() == 0) } -func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, modifyCount int64) (float64, error) { +func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, encodedVal []byte, modifyCount int64) (float64, error) { if val.IsNull() { return float64(c.NullCount), nil } @@ -987,7 +979,7 @@ func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, mo if c.Histogram.Bounds.NumRows() == 0 { return 0.0, nil } - if c.Histogram.NDV > 0 && c.outOfRange(val) { + if c.Histogram.NDV > 0 && c.outOfRange(val, encodedVal) { return outOfRangeEQSelectivity(c.Histogram.NDV, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount(), nil } if c.CMSketch != nil { @@ -996,14 +988,17 @@ func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, mo } return c.Histogram.equalRowCount(val, false), nil } + // All the values are null. + if c.Histogram.Bounds.NumRows() == 0 && c.TopN.Num() == 0 { + return 0, nil + } + if c.Histogram.NDV+int64(c.TopN.Num()) > 0 && c.outOfRange(val, encodedVal) { + return outOfRangeEQSelectivity(c.Histogram.NDV, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount(), nil + } // Stats version == 2 // 1. try to find this value in TopN if c.TopN != nil { - valBytes, err := codec.EncodeKey(sc, nil, val) - if err != nil { - return 0, errors.Trace(err) - } - rowcount, ok := c.QueryTopN(valBytes) + rowcount, ok := c.QueryTopN(encodedVal) if ok { return float64(rowcount), nil } @@ -1054,6 +1049,14 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range if err != nil { return 0, errors.Trace(err) } + lowEncoded, err := codec.EncodeKey(sc, nil, lowVal) + if err != nil { + return 0, err + } + highEncoded, err := codec.EncodeKey(sc, nil, highVal) + if err != nil { + return 0, err + } if cmp == 0 { // the point case. if !rg.LowExclude && !rg.HighExclude { @@ -1063,7 +1066,7 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range continue } var cnt float64 - cnt, err = c.equalRowCount(sc, lowVal, modifyCount) + cnt, err = c.equalRowCount(sc, lowVal, lowEncoded, modifyCount) if err != nil { return 0, errors.Trace(err) } @@ -1075,7 +1078,7 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range // The small range case. if rangeVals != nil { for _, val := range rangeVals { - cnt, err := c.equalRowCount(sc, val, modifyCount) + cnt, err := c.equalRowCount(sc, val, lowEncoded, modifyCount) if err != nil { return 0, err } @@ -1084,18 +1087,15 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range continue } // The interval case. - cnt, err := c.BetweenRowCount(sc, lowVal, highVal) - if err != nil { - return 0, err - } - if (c.outOfRange(lowVal) && !lowVal.IsNull()) || c.outOfRange(highVal) { + cnt := c.BetweenRowCount(sc, lowVal, highVal, lowEncoded, highEncoded) + if (c.outOfRange(lowVal, lowEncoded) && !lowVal.IsNull()) || c.outOfRange(highVal, highEncoded) { cnt += outOfRangeEQSelectivity(outOfRangeBetweenRate, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount() } // `betweenRowCount` returns count for [l, h) range, we adjust cnt for boudaries here. // Note that, `cnt` does not include null values, we need specially handle cases // where null is the lower bound. if rg.LowExclude && !lowVal.IsNull() { - lowCnt, err := c.equalRowCount(sc, lowVal, modifyCount) + lowCnt, err := c.equalRowCount(sc, lowVal, lowEncoded, modifyCount) if err != nil { return 0, errors.Trace(err) } @@ -1105,7 +1105,7 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range cnt += float64(c.NullCount) } if !rg.HighExclude { - highCnt, err := c.equalRowCount(sc, highVal, modifyCount) + highCnt, err := c.equalRowCount(sc, highVal, highEncoded, modifyCount) if err != nil { return 0, errors.Trace(err) } @@ -1121,6 +1121,15 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range return rowCount, nil } +func (c *Column) outOfRange(val types.Datum, encodedVal []byte) bool { + outOfHist := c.Histogram.outOfRange(val) + if !outOfHist { + return false + } + // Already out of hist. + return c.TopN.outOfRange(encodedVal) +} + // Index represents an index histogram. type Index struct { Histogram @@ -1504,26 +1513,21 @@ func (coll *HistColl) NewHistCollBySelectivity(sc *stmtctx.StatementContext, sta } func (idx *Index) outOfRange(val types.Datum) bool { - histEmpty, topNEmpty := idx.Histogram.Len() == 0, idx.TopN.Num() == 0 - // All empty. - if histEmpty && topNEmpty { - return true - } - // TopN is not empty. Record found. - if !topNEmpty && idx.TopN.findTopN(val.GetBytes()) >= 0 { + outOfTopN := idx.TopN.outOfRange(val.GetBytes()) + // The val is in TopN, return false. + if !outOfTopN { return false } - if !histEmpty { - withInLowBoundOrPrefixMatch := chunk.Compare(idx.Bounds.GetRow(0), 0, &val) <= 0 || - matchPrefix(idx.Bounds.GetRow(0), 0, &val) - withInHighBound := chunk.Compare(idx.Bounds.GetRow(idx.Bounds.NumRows()-1), 0, &val) >= 0 - // Hist is not empty. Record found. - if withInLowBoundOrPrefixMatch && withInHighBound { - return false - } + + histEmpty := idx.Histogram.Len() == 0 + // HistEmpty->Hist out of range. + if histEmpty { + return true } - // No record found. Is out of range. - return true + withInLowBoundOrPrefixMatch := chunk.Compare(idx.Bounds.GetRow(0), 0, &val) <= 0 || + matchPrefix(idx.Bounds.GetRow(0), 0, &val) + withInHighBound := chunk.Compare(idx.Bounds.GetRow(idx.Bounds.NumRows()-1), 0, &val) >= 0 + return !withInLowBoundOrPrefixMatch || !withInHighBound } // matchPrefix checks whether ad is the prefix of value diff --git a/statistics/selectivity_test.go b/statistics/selectivity_test.go index 359e1d2db9585..ac12be22442b9 100644 --- a/statistics/selectivity_test.go +++ b/statistics/selectivity_test.go @@ -649,19 +649,19 @@ func (s *testStatsSuite) TestTopNOutOfHist(c *C) { testKit.MustExec("drop table if exists topn_before_hist") testKit.MustExec("create table topn_before_hist(a int, index idx(a))") - testKit.MustExec("insert into topn_before_hist values(1), (1), (1), (1), (2), (2), (3), (4), (5)") + testKit.MustExec("insert into topn_before_hist values(1), (1), (1), (1), (3), (3), (4), (5), (6)") testKit.MustExec("analyze table topn_before_hist with 2 topn, 3 buckets") testKit.MustExec("create table topn_after_hist(a int, index idx(a))") - testKit.MustExec("insert into topn_after_hist values(2), (2), (3), (4), (5), (6), (6), (6), (6)") + testKit.MustExec("insert into topn_after_hist values(2), (2), (3), (4), (5), (7), (7), (7), (7)") testKit.MustExec("analyze table topn_after_hist with 2 topn, 3 buckets") testKit.MustExec("create table topn_before_hist_no_index(a int)") - testKit.MustExec("insert into topn_before_hist_no_index values(1), (1), (1), (1), (2), (2), (3), (4), (5)") + testKit.MustExec("insert into topn_before_hist_no_index values(1), (1), (1), (1), (3), (3), (4), (5), (6)") testKit.MustExec("analyze table topn_before_hist_no_index with 2 topn, 3 buckets") testKit.MustExec("create table topn_after_hist_no_index(a int)") - testKit.MustExec("insert into topn_after_hist_no_index values(2), (2), (3), (4), (5), (6), (6), (6), (6)") + testKit.MustExec("insert into topn_after_hist_no_index values(2), (2), (3), (4), (5), (7), (7), (7), (7)") testKit.MustExec("analyze table topn_after_hist_no_index with 2 topn, 3 buckets") var ( diff --git a/statistics/statistics_test.go b/statistics/statistics_test.go index 7fd0bf64b0bf5..c352c3576c89b 100644 --- a/statistics/statistics_test.go +++ b/statistics/statistics_test.go @@ -458,7 +458,7 @@ func (s *testStatisticsSuite) TestPseudoTable(c *C) { count, err := tbl.ColumnEqualRowCount(sc, types.NewIntDatum(1000), colInfo.ID) c.Assert(err, IsNil) c.Assert(int(count), Equals, 10) - count = tbl.ColumnBetweenRowCount(sc, types.NewIntDatum(1000), types.NewIntDatum(5000), colInfo.ID) + count, _ = tbl.ColumnBetweenRowCount(sc, types.NewIntDatum(1000), types.NewIntDatum(5000), colInfo.ID) c.Assert(int(count), Equals, 250) } diff --git a/statistics/table.go b/statistics/table.go index 7628e018e25a5..85807fbefc67f 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -277,19 +277,24 @@ func (t *Table) ColumnLessRowCount(sc *stmtctx.StatementContext, value types.Dat } // ColumnBetweenRowCount estimates the row count where column greater or equal to a and less than b. -func (t *Table) ColumnBetweenRowCount(sc *stmtctx.StatementContext, a, b types.Datum, colID int64) float64 { +func (t *Table) ColumnBetweenRowCount(sc *stmtctx.StatementContext, a, b types.Datum, colID int64) (float64, error) { c, ok := t.Columns[colID] if !ok || c.IsInvalid(sc, t.Pseudo) { - return float64(t.Count) / pseudoBetweenRate + return float64(t.Count) / pseudoBetweenRate, nil } - count, err := c.BetweenRowCount(sc, a, b) + aEncoded, err := codec.EncodeKey(sc, nil, a) if err != nil { - return 0 + return 0, err } + bEncoded, err := codec.EncodeKey(sc, nil, b) + if err != nil { + return 0, err + } + count := c.BetweenRowCount(sc, a, b, aEncoded, bEncoded) if a.IsNull() { count += float64(c.NullCount) } - return count * c.GetIncreaseFactor(t.Count) + return count * c.GetIncreaseFactor(t.Count), nil } // ColumnEqualRowCount estimates the row count where the column equals to value. @@ -298,7 +303,11 @@ func (t *Table) ColumnEqualRowCount(sc *stmtctx.StatementContext, value types.Da if !ok || c.IsInvalid(sc, t.Pseudo) { return float64(t.Count) / pseudoEqualRate, nil } - result, err := c.equalRowCount(sc, value, t.ModifyCount) + encodedVal, err := codec.EncodeKey(sc, nil, value) + if err != nil { + return 0, err + } + result, err := c.equalRowCount(sc, value, encodedVal, t.ModifyCount) result *= c.GetIncreaseFactor(t.Count) return result, errors.Trace(err) } diff --git a/statistics/testdata/stats_suite_in.json b/statistics/testdata/stats_suite_in.json index 631e2aa6c60e2..b20b6d8300433 100644 --- a/statistics/testdata/stats_suite_in.json +++ b/statistics/testdata/stats_suite_in.json @@ -72,9 +72,13 @@ "show stats_topn", "show stats_buckets", "explain select * from topn_before_hist where a = 1", + "explain select * from topn_before_hist where a = 2", + "explain select * from topn_after_hist where a = 7", "explain select * from topn_after_hist where a = 6", + "explain select * from topn_after_hist_no_index where a = 7", "explain select * from topn_after_hist_no_index where a = 6", - "explain select * from topn_before_hist_no_index where a = 1" + "explain select * from topn_before_hist_no_index where a = 1", + "explain select * from topn_before_hist_no_index where a = 2" ] }, { diff --git a/statistics/testdata/stats_suite_out.json b/statistics/testdata/stats_suite_out.json index c25f082455c2f..b60a351b6cead 100644 --- a/statistics/testdata/stats_suite_out.json +++ b/statistics/testdata/stats_suite_out.json @@ -427,29 +427,29 @@ "Cases": [ [ "test topn_before_hist a 0 1 4", - "test topn_before_hist a 0 2 2", + "test topn_before_hist a 0 3 2", "test topn_before_hist idx 1 1 4", - "test topn_before_hist idx 1 2 2", + "test topn_before_hist idx 1 3 2", "test topn_after_hist a 0 2 2", - "test topn_after_hist a 0 6 4", + "test topn_after_hist a 0 7 4", "test topn_after_hist idx 1 2 2", - "test topn_after_hist idx 1 6 4", + "test topn_after_hist idx 1 7 4", "test topn_before_hist_no_index a 0 1 4", - "test topn_before_hist_no_index a 0 2 2", + "test topn_before_hist_no_index a 0 3 2", "test topn_after_hist_no_index a 0 2 2", - "test topn_after_hist_no_index a 0 6 4" + "test topn_after_hist_no_index a 0 7 4" ], [ - "test topn_before_hist a 0 0 2 1 3 4 0", - "test topn_before_hist a 0 1 3 1 5 5 0", - "test topn_before_hist idx 1 0 2 1 3 4 0", - "test topn_before_hist idx 1 1 3 1 5 5 0", + "test topn_before_hist a 0 0 2 1 4 5 0", + "test topn_before_hist a 0 1 3 1 6 6 0", + "test topn_before_hist idx 1 0 2 1 4 5 0", + "test topn_before_hist idx 1 1 3 1 6 6 0", "test topn_after_hist a 0 0 2 1 3 4 0", "test topn_after_hist a 0 1 3 1 5 5 0", "test topn_after_hist idx 1 0 2 1 3 4 0", "test topn_after_hist idx 1 1 3 1 5 5 0", - "test topn_before_hist_no_index a 0 0 2 1 3 4 0", - "test topn_before_hist_no_index a 0 1 3 1 5 5 0", + "test topn_before_hist_no_index a 0 0 2 1 4 5 0", + "test topn_before_hist_no_index a 0 1 3 1 6 6 0", "test topn_after_hist_no_index a 0 0 2 1 3 4 0", "test topn_after_hist_no_index a 0 1 3 1 5 5 0" ], @@ -457,19 +457,37 @@ "IndexReader_6 4.00 root index:IndexRangeScan_5", "└─IndexRangeScan_5 4.00 cop[tikv] table:topn_before_hist, index:idx(a) range:[1,1], keep order:false" ], + [ + "IndexReader_6 0.00 root index:IndexRangeScan_5", + "└─IndexRangeScan_5 0.00 cop[tikv] table:topn_before_hist, index:idx(a) range:[2,2], keep order:false" + ], [ "IndexReader_6 4.00 root index:IndexRangeScan_5", - "└─IndexRangeScan_5 4.00 cop[tikv] table:topn_after_hist, index:idx(a) range:[6,6], keep order:false" + "└─IndexRangeScan_5 4.00 cop[tikv] table:topn_after_hist, index:idx(a) range:[7,7], keep order:false" + ], + [ + "IndexReader_6 0.00 root index:IndexRangeScan_5", + "└─IndexRangeScan_5 0.00 cop[tikv] table:topn_after_hist, index:idx(a) range:[6,6], keep order:false" ], [ "TableReader_7 4.00 root data:Selection_6", - "└─Selection_6 4.00 cop[tikv] eq(test.topn_after_hist_no_index.a, 6)", + "└─Selection_6 4.00 cop[tikv] eq(test.topn_after_hist_no_index.a, 7)", + " └─TableFullScan_5 9.00 cop[tikv] table:topn_after_hist_no_index keep order:false" + ], + [ + "TableReader_7 1.00 root data:Selection_6", + "└─Selection_6 1.00 cop[tikv] eq(test.topn_after_hist_no_index.a, 6)", " └─TableFullScan_5 9.00 cop[tikv] table:topn_after_hist_no_index keep order:false" ], [ "TableReader_7 4.00 root data:Selection_6", "└─Selection_6 4.00 cop[tikv] eq(test.topn_before_hist_no_index.a, 1)", " └─TableFullScan_5 9.00 cop[tikv] table:topn_before_hist_no_index keep order:false" + ], + [ + "TableReader_7 1.00 root data:Selection_6", + "└─Selection_6 1.00 cop[tikv] eq(test.topn_before_hist_no_index.a, 2)", + " └─TableFullScan_5 9.00 cop[tikv] table:topn_before_hist_no_index keep order:false" ] ] }, From 9fad132d1d892ade178d0614687660c29cc5af54 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 2 Jun 2021 23:26:26 +0800 Subject: [PATCH 14/14] case: make CTE case be stable (#25035) --- executor/cte.go | 4 +-- executor/cte_test.go | 85 +++++++++++++++++++++----------------------- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 95921670cb5cf..055163dc17e4f 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -111,8 +111,6 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { if err = e.iterOutTbl.OpenAndRef(); err != nil { return err } - - setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) } if e.isDistinct { @@ -137,11 +135,13 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { defer e.resTbl.Unlock() resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) + iterOutAction := setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { defer resAction.WaitForTest() defer iterInAction.WaitForTest() + defer iterOutAction.WaitForTest() } }) diff --git a/executor/cte_test.go b/executor/cte_test.go index e5789170627f7..d6e212484ad29 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -27,14 +27,16 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" ) -var _ = check.Suite(&CTETestSuite{}) +var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}}) +var _ = check.SerialSuites(&CTESerialTestSuite{&baseCTETestSuite{}}) -type CTETestSuite struct { +type baseCTETestSuite struct { store kv.Storage dom *domain.Domain sessionCtx sessionctx.Context @@ -42,7 +44,15 @@ type CTETestSuite struct { ctx context.Context } -func (test *CTETestSuite) SetUpSuite(c *check.C) { +type CTETestSuite struct { + *baseCTETestSuite +} + +type CTESerialTestSuite struct { + *baseCTETestSuite +} + +func (test *baseCTETestSuite) SetUpSuite(c *check.C) { var err error test.store, err = mockstore.NewMockStore() c.Assert(err, check.IsNil) @@ -59,7 +69,7 @@ func (test *CTETestSuite) SetUpSuite(c *check.C) { test.ctx = context.Background() } -func (test *CTETestSuite) TearDownSuite(c *check.C) { +func (test *baseCTETestSuite) TearDownSuite(c *check.C) { test.dom.Close() test.store.Close() } @@ -107,70 +117,55 @@ func (test *CTETestSuite) TestBasicCTE(c *check.C) { rows.Check(testkit.Rows("1", "2")) } -func (test *CTETestSuite) TestSpillToDisk(c *check.C) { +func (test *CTESerialTestSuite) TestSpillToDisk(c *check.C) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMUseTmpStorage = true + }) + tk := testkit.NewTestKit(c, test.store) tk.MustExec("use test;") c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)"), check.IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill"), check.IsNil) + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + }() + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"), check.IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"), check.IsNil) }() - - insertStr := "insert into t1 values(0, 0)" - for i := 1; i < 5000; i++ { - insertStr += fmt.Sprintf(", (%d, %d)", i, i) - } - - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int, c2 int);") - tk.MustExec(insertStr) - tk.MustExec("set tidb_mem_quota_query = 80000;") - rows := tk.MustQuery("with recursive cte1 as ( " + - "select c1 from t1 " + - "union " + - "select c1 + 1 c1 from cte1 where c1 < 5000) " + - "select c1 from cte1;") - - memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker - diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker - c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0)) - c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0)) - - rowNum := 5000 - var resRows []string - for i := 0; i <= rowNum; i++ { - resRows = append(resRows, fmt.Sprintf("%d", i)) - } - rows.Check(testkit.Rows(resRows...)) // Use duplicated rows to test UNION DISTINCT. tk.MustExec("set tidb_mem_quota_query = 1073741824;") - insertStr = "insert into t1 values(0, 0)" + insertStr := "insert into t1 values(0)" + rowNum := 1000 vals := make([]int, rowNum) vals[0] = 0 for i := 1; i < rowNum; i++ { v := rand.Intn(100) vals[i] = v - insertStr += fmt.Sprintf(", (%d, %d)", v, v) + insertStr += fmt.Sprintf(", (%d)", v) } tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int, c2 int);") + tk.MustExec("create table t1(c1 int);") tk.MustExec(insertStr) - tk.MustExec("set tidb_mem_quota_query = 80000;") + tk.MustExec("set tidb_mem_quota_query = 40000;") tk.MustExec("set cte_max_recursion_depth = 500000;") - rows = tk.MustQuery("with recursive cte1 as ( " + - "select c1 from t1 " + - "union " + - "select c1 + 1 c1 from cte1 where c1 < 5000) " + - "select c1 from cte1 order by c1;") - - memTracker = tk.Se.GetSessionVars().StmtCtx.MemTracker - diskTracker = tk.Se.GetSessionVars().StmtCtx.DiskTracker + sql := fmt.Sprintf("with recursive cte1 as ( "+ + "select c1 from t1 "+ + "union "+ + "select c1 + 1 c1 from cte1 where c1 < %d) "+ + "select c1 from cte1 order by c1;", rowNum) + rows := tk.MustQuery(sql) + + memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker + diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0)) c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0)) sort.Ints(vals) - resRows = make([]string, 0, rowNum) + resRows := make([]string, 0, rowNum) for i := vals[0]; i <= rowNum; i++ { resRows = append(resRows, fmt.Sprintf("%d", i)) }