Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Selection can control the conditions of the below scan plan. #2834

Merged
merged 30 commits into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5db58a9
*: add indexScan controlled by correlated column
winoros Mar 15, 2017
1fe024d
Merge branch 'master' into yiding/apply_optimize
winoros Mar 15, 2017
5ce9107
tiny change
winoros Mar 15, 2017
364e172
fix merge bug
winoros Mar 15, 2017
36283cf
remove log
winoros Mar 15, 2017
7451287
remove log
winoros Mar 15, 2017
053c2d4
fix bug and tiny change
winoros Mar 16, 2017
e663291
address some comments
winoros Mar 16, 2017
a25a3eb
manage cast in plan build phase
winoros Mar 17, 2017
6aadeaa
address comments and manage cast in executor phase
winoros Mar 17, 2017
ba29a95
address some comments
winoros Mar 20, 2017
02cbcf1
remove useless args
winoros Mar 20, 2017
9a9ae32
change some logic
winoros Mar 20, 2017
ef178a3
fix bug
winoros Mar 20, 2017
8029c1e
refactor
winoros Mar 21, 2017
9771129
address comments
winoros Mar 21, 2017
7251827
tiny change
winoros Mar 21, 2017
e45f669
address some comments
winoros Mar 22, 2017
4e0012b
add test
winoros Mar 22, 2017
f0ec2e0
remove redundant function
winoros Mar 22, 2017
02db63c
move function
winoros Mar 22, 2017
c791ff7
move code
winoros Mar 22, 2017
8674c3f
typo error && add explain
winoros Mar 22, 2017
107a042
Merge branch 'master' into yiding/inlj_1
winoros Mar 23, 2017
3fd0435
remove outdate comment
winoros Mar 23, 2017
8f29d17
add tests and some tiny change
winoros Mar 24, 2017
088e9e6
tiny change
winoros Mar 24, 2017
6351036
add test
winoros Mar 24, 2017
97998fc
address comments
winoros Mar 27, 2017
55f92e6
Merge branch 'master' into yiding/inlj_1
shenli Mar 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,11 @@ func (b *executorBuilder) buildAggregation(v *plan.PhysicalAggregation) Executor

func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
exec := &SelectionExec{
Src: b.build(v.Children()[0]),
Condition: expression.ComposeCNFCondition(b.ctx, v.Conditions...),
schema: v.Schema(),
ctx: b.ctx,
Src: b.build(v.Children()[0]),
schema: v.Schema(),
ctx: b.ctx,
scanController: v.ScanController,
Conditions: v.Conditions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you store the conditions, you don't need to store condition any more. Use a for loop to check condition.

}
return exec
}
Expand Down
70 changes: 66 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,67 @@ type SelectionExec struct {
Condition expression.Expression
ctx context.Context
schema *expression.Schema

// scanController will tell whether this selection need to
// control the condition of below scan executor.
scanController bool
controllerInit bool
Conditions []expression.Expression
}

// Schema implements the Executor Schema interface.
func (e *SelectionExec) Schema() *expression.Schema {
return e.schema
}

// initController will init the conditions of the below scan executor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more comments for this function.

// It will first substitute the correlated column to constant, then build range and filter by new conditions.
func (e *SelectionExec) initController() error {
sc := e.ctx.GetSessionVars().StmtCtx
client := e.ctx.GetClient()
newConds := make([]expression.Expression, 0, len(e.Conditions))
for _, cond := range e.Conditions {
newCond, err := expression.SubstituteCorCol2Constant(cond.Clone())
if err != nil {
return errors.Trace(err)
}
newConds = append(newConds, newCond)
}

switch x := e.Src.(type) {
case *XSelectTableExec:
accessCondition, restCondtion := plan.DetachTableScanConditions(newConds, x.tableInfo)
x.where, _, _ = plan.ExpressionsToPB(sc, restCondtion, client)
ranges, err := plan.BuildTableRange(accessCondition, sc)
if err != nil {
return errors.Trace(err)
}
x.ranges = ranges
case *XSelectIndexExec:
x.indexPlan.AccessCondition, newConds = plan.DetachIndexScanConditions(newConds, x.indexPlan)
idxConds, tblConds := plan.DetachIndexFilterConditions(newConds, x.indexPlan.Index.Columns, x.indexPlan.Table)
x.indexPlan.IndexConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, idxConds, client)
x.indexPlan.TableConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, tblConds, client)
err := plan.BuildIndexRange(sc, x.indexPlan)
if err != nil {
return errors.Trace(err)
}
x.where = x.indexPlan.TableConditionPBExpr
default:
return errors.Errorf("Error type of Executor: %T", x)
}
return nil
}

// Next implements the Executor Next interface.
func (e *SelectionExec) Next() (*Row, error) {
if e.scanController && !e.controllerInit {
err := e.initController()
if err != nil {
return nil, errors.Trace(err)
}
e.controllerInit = true
}
for {
srcRow, err := e.Src.Next()
if err != nil {
Expand All @@ -490,18 +542,28 @@ func (e *SelectionExec) Next() (*Row, error) {
if srcRow == nil {
return nil, nil
}
match, err := expression.EvalBool(e.Condition, srcRow.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
allMatch := true
for _, cond := range e.Conditions {
match, err := expression.EvalBool(cond, srcRow.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
if !match {
allMatch = false
break
}
}
if match {
if allMatch {
return srcRow, nil
}
}
}

// Close implements the Executor Close interface.
func (e *SelectionExec) Close() error {
if e.scanController {
e.controllerInit = false
}
return e.Src.Close()
}

Expand Down
13 changes: 13 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,3 +1280,16 @@ func (s *testSuite) TestHistoryRead(c *C) {
tk.MustExec("set @@tidb_snapshot = ''")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
}

func (s *testSuite) TestScanControlSelection(c *C) {
defer func() {
s.cleanEnv(c)
testleak.AfterTest(c)()
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, c int, index idx_b(b))")
tk.MustExec("insert into t values (1, 1, 1), (2, 1, 1), (3, 1, 2), (4, 2, 3)")
tk.MustQuery("select (select count(1) k from t s where s.b = t1.c) from t t1").Check(testkit.Rows("3", "3", "1", "0"))
}
74 changes: 74 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,80 @@ func (s *testSuite) TestExplain(c *C) {
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
},
{
"select (select count(1) k from t1 s where s.c1 = t1.c1 having k != 0) from t1",
[]string{
"TableScan_12", "TableScan_13", "Selection_4", "StreamAgg_15", "Selection_10", "Apply_16", "Projection_2",
},
[]string{
"Apply_16", "Selection_4", "StreamAgg_15", "Selection_10", "Apply_16", "Projection_2", "",
},
[]string{
`{
"db": "test",
"table": "t1",
"desc": false,
"keep order": false,
"push down info": {
"limit": 0,
"access conditions": null,
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
"db": "test",
"table": "t1",
"desc": false,
"keep order": false,
"push down info": {
"limit": 0,
"access conditions": null,
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
"condition": [
"eq(s.c1, test.t1.c1)"
],
"scanController": true,
"child": "TableScan_13"
}`,
`{
"AggFuncs": [
"count(1)"
],
"GroupByItems": null,
"child": "Selection_4"
}`,
`{
"condition": [
"ne(aggregation_5_col_0, 0)"
],
"scanController": false,
"child": "StreamAgg_15"
}`,
`{
"innerPlan": "Selection_10",
"outerPlan": "TableScan_12",
"join": {
"eqCond": null,
"leftCond": null,
"rightCond": null,
"otherCond": null,
"leftPlan": "TableScan_12",
"rightPlan": "Selection_10"
}
}`,
`{
"exprs": [
"k"
],
"child": "Apply_16"
}`,
},
},
Expand Down
37 changes: 37 additions & 0 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,40 @@ func (d *distinctChecker) Check(values []interface{}) (bool, error) {
d.existingKeys[key] = true
return true, nil
}

// SubstituteCorCol2Constant will substitute correlated column to constant value which it contains.
// If the args of one scalar function are all constant, we will substitute it to constant.
func SubstituteCorCol2Constant(expr Expression) (Expression, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not too hard to write a test for this function.

switch x := expr.(type) {
case *ScalarFunction:
allConstant := true
newArgs := make([]Expression, 0, len(x.GetArgs()))
for _, arg := range x.GetArgs() {
newArg, err := SubstituteCorCol2Constant(arg)
if err != nil {
return nil, errors.Trace(err)
}
_, ok := newArg.(*Constant)
newArgs = append(newArgs, newArg)
allConstant = allConstant && ok
}
if allConstant {
val, err := x.Eval(nil)
if err != nil {
return nil, errors.Trace(err)
}
return &Constant{Value: val}, nil
}
var newSf Expression
if x.FuncName.L == ast.Cast {
newSf = NewCastFunc(x.RetType, newArgs[0], x.GetCtx())
} else {
newSf, _ = NewFunction(x.GetCtx(), x.FuncName.L, x.GetType(), newArgs...)
}
return newSf, nil
case *CorrelatedColumn:
return &Constant{Value: *x.Data, RetType: x.GetType()}, nil
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case we will fall to default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*Column will fall to default. i will move *Constant to default too.

return x.Clone(), nil
}
}
21 changes: 21 additions & 0 deletions expression/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ package expression

import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
)

var _ = check.Suite(&testUtilSuite{})
Expand Down Expand Up @@ -43,3 +47,20 @@ func (s *testUtilSuite) TestDistinct(c *check.C) {
c.Assert(d, check.Equals, t.expect)
}
}

func (s *testUtilSuite) TestSubstituteCorCol2Constant(c *check.C) {
defer testleak.AfterTest(c)()
ctx := mock.NewContext()
corCol1 := &CorrelatedColumn{Data: &One.Value}
corCol2 := &CorrelatedColumn{Data: &One.Value}
cast := NewCastFunc(types.NewFieldType(mysql.TypeLonglong), corCol1, ctx)
plus := newFunction(ast.Plus, cast, corCol2)
plus2 := newFunction(ast.Plus, plus, One)
ans := &Constant{Value: types.NewIntDatum(3)}
ret, err := SubstituteCorCol2Constant(plus2)
c.Assert(err, check.IsNil)
c.Assert(ret.Equal(ans, ctx), check.IsTrue)
col1 := &Column{Index: 1}
newCol, err := SubstituteCorCol2Constant(col1)
c.Assert(newCol.Equal(col1, ctx), check.IsTrue)
}
67 changes: 67 additions & 0 deletions plan/decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/types"
)

Expand Down Expand Up @@ -158,6 +159,11 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
}
}
}
if sel, ok := p.(*Selection); ok {
if ds, ok := p.Children()[0].(*DataSource); ok {
sel.canControlScan = sel.hasUsableIndicesAndPk(ds)
}
}
newChildren := make([]Plan, 0, len(p.Children()))
for _, child := range p.Children() {
np, _ := s.optimize(child.(LogicalPlan), nil, nil)
Expand All @@ -167,3 +173,64 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
p.SetChildren(newChildren...)
return p, nil
}

// hasUsableIndicesAndPk will simply check whether the pk or one index could used in this situation by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be used

// checking whether this index or pk is contained in one condition that has correlated column,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in any condition

// and whether this condition can be used as an access condition.
func (p *Selection) hasUsableIndicesAndPk(ds *DataSource) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is very complex, need to write a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

indices, includeTableScan := availableIndices(ds.indexHints, ds.tableInfo)
var newConds []expression.Expression
for _, expr := range p.Conditions {
if !expr.IsCorrelated() {
continue
}
cond := pushDownNot(expr, false, nil)
corCols := extractCorColumns(cond)
for _, col := range corCols {
*col.Data = expression.One.Value
}
newCond, _ := expression.SubstituteCorCol2Constant(cond)
newConds = append(newConds, newCond)
}
if ds.tableInfo.PKIsHandle && includeTableScan {
var pkCol *expression.Column
for i, col := range ds.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider composed index?

pkCol = ds.schema.Columns[i]
break
}
}
if pkCol != nil {
checker := conditionChecker{
pkName: pkCol.ColName,
length: types.UnspecifiedLength,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use UnspecifiedLength?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same with DetachTableScanConditions() in refiner.go.

}
// Pk should satisfies the same property as the index.
for _, cond := range newConds {
if checker.check(cond) {
return true
}
}
}
}
for _, idx := range indices {
// TODO: Currently we don't consider composite index.
if len(idx.Columns) > 1 {
continue
}
checker := &conditionChecker{
idx: idx,
columnOffset: 0,
length: idx.Columns[0].Length,
}
// This idx column should occur in one condition which contains both column and correlated column.
// And conditionChecker.check(this condition) should be true.
for _, cond := range newConds {
// If one cond is ok, then this index is useful.
if checker.check(cond) {
return true
}
}
}
return false
}
3 changes: 2 additions & 1 deletion plan/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func expressionsToPB(sc *variable.StatementContext, exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, pushed []expression.Expression, remained []expression.Expression) {
// ExpressionsToPB converts expression to tipb.Expr.
func ExpressionsToPB(sc *variable.StatementContext, exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, pushed []expression.Expression, remained []expression.Expression) {
pc := pbConverter{client: client, sc: sc}
for _, expr := range exprs {
v := pc.exprToPB(expr)
Expand Down
8 changes: 8 additions & 0 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ type Selection struct {

// onTable means if this selection's child is a table scan or index scan.
onTable bool

// If ScanController is true, then the child of this selection is a scan,
// which use pk or index. we will record the accessConditions, idxConditions,
// and tblConditions to control the below plan.
ScanController bool

// We will check this at decorrelate phase.
canControlScan bool
}

func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
Loading