Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ranger: add a more common method to calc range. #3489

Merged
merged 19 commits into from
Jun 21, 2017
Merged
38 changes: 38 additions & 0 deletions expression/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,41 @@ func Column2Exprs(cols []*Column) []Expression {
}
return result
}

// ColInfo2Col finds the corresponding column of the ColumnInfo in a column slice.
func ColInfo2Col(cols []*Column, col *model.ColumnInfo) *Column {
for _, c := range cols {
if c.ColName.L == col.Name.L {
return c
}
}
return nil
}

// indexCol2Col finds the corresponding column of the IndexColumn in a column slice.
func indexCol2Col(cols []*Column, col *model.IndexColumn) *Column {
for _, c := range cols {
if c.ColName.L == col.Name.L {
return c
}
}
return nil
}

// IndexInfo2Cols gets the corresponding []*Column of the indexInfo's []*IndexColumn,
// together with a []int containing their lengths.
// If this index has three IndexColumn that the 1st and 3rd IndexColumn has corresponding *Column,
// the return value will be only the 1st corresponding *Column and its length.
func IndexInfo2Cols(cols []*Column, index *model.IndexInfo) ([]*Column, []int) {
retCols := make([]*Column, 0, len(index.Columns))
lengths := make([]int, 0, len(index.Columns))
for _, c := range index.Columns {
col := indexCol2Col(cols, c)
if col == nil {
return retCols, lengths
}
retCols = append(retCols, col)
lengths = append(lengths, c.Length)
}
return retCols, lengths
}
11 changes: 11 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ func (t *TableInfo) GetPkName() CIStr {
return CIStr{}
}

// GetPkColInfo gets the ColumnInfo of pk if exists.
// Make sure PkIsHandle checked before call this method.
func (t *TableInfo) GetPkColInfo() *ColumnInfo {
for _, colInfo := range t.Columns {
if mysql.HasPriKeyFlag(colInfo.Flag) {
return colInfo
}
}
return nil
}

// ColumnIsInIndex checks whether c is included in any indices of t.
func (t *TableInfo) ColumnIsInIndex(c *ColumnInfo) bool {
for _, index := range t.Indices {
Expand Down
67 changes: 37 additions & 30 deletions plan/new_physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -684,22 +685,27 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
sc := p.ctx.GetSessionVars().StmtCtx
idxCols, colLengths := expression.IndexInfo2Cols(p.Schema().Columns, idx)
is.Ranges = ranger.FullIndexRange()
if len(p.pushedDownConds) > 0 {
conds := make([]expression.Expression, 0, len(p.pushedDownConds))
for _, cond := range p.pushedDownConds {
conds = append(conds, cond.Clone())
}
is.AccessCondition, is.filterCondition, is.accessEqualCount, is.accessInAndEqCount = ranger.DetachIndexScanConditions(conds, idx)
is.Ranges, err = ranger.BuildIndexRange(sc, is.Table, is.Index, is.accessInAndEqCount, is.AccessCondition)
if err != nil {
return nil, errors.Trace(err)
}
rowCount, err = statsTbl.GetRowCountByIndexRanges(sc, is.Index.ID, is.Ranges)
if err != nil {
return nil, errors.Trace(err)
if len(idxCols) > 0 {
var ranges []types.Range
ranges, is.AccessCondition, is.filterCondition, err = ranger.BuildRange(sc, conds, ranger.IndexRangeType, idxCols, colLengths)
if err != nil {
return nil, errors.Trace(err)
}
is.Ranges = ranger.Ranges2IndexRanges(ranges)
rowCount, err = statsTbl.GetRowCountByIndexRanges(sc, is.Index.ID, is.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
} else {
is.filterCondition = conds
}
} else {
is.Ranges = ranger.FullIndexRange()
}
cop := &copTask{
cnt: rowCount,
Expand Down Expand Up @@ -739,7 +745,7 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
if col.Name.L == prop.cols[0].ColName.L {
matchProperty = matchIndicesProp(idx.Columns[i:], prop.cols)
break
} else if i >= is.accessEqualCount {
} else if i >= len(is.AccessCondition) || is.AccessCondition[i].(*expression.ScalarFunction).FuncName.L != ast.EQ {
matchProperty = false
break
}
Expand Down Expand Up @@ -818,34 +824,35 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err erro
}.init(p.allocator, p.ctx)
ts.SetSchema(p.schema)
sc := p.ctx.GetSessionVars().StmtCtx
ts.Ranges = ranger.FullIntRange()
var pkCol *expression.Column
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
pkCol = expression.ColInfo2Col(ts.schema.Columns, pkColInfo)
}
}
if len(p.pushedDownConds) > 0 {
conds := make([]expression.Expression, 0, len(p.pushedDownConds))
for _, cond := range p.pushedDownConds {
conds = append(conds, cond.Clone())
}
ts.AccessCondition, ts.filterCondition = ranger.DetachColumnConditions(conds, p.tableInfo.GetPkName())
ts.Ranges, err = ranger.BuildTableRange(ts.AccessCondition, sc)
if err != nil {
return nil, errors.Trace(err)
}
} else {
ts.Ranges = []types.IntColumnRange{{math.MinInt64, math.MaxInt64}}
}
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
var pkCol *expression.Column
if p.tableInfo.PKIsHandle {
for i, colInfo := range ts.Columns {
if mysql.HasPriKeyFlag(colInfo.Flag) {
pkCol = p.Schema().Columns[i]
break
}
}
if pkCol != nil {
rowCount, err = statsTbl.GetRowCountByIntColumnRanges(sc, pkCol.ID, ts.Ranges)
var ranges []types.Range
ranges, ts.AccessCondition, ts.filterCondition, err = ranger.BuildRange(sc, conds, ranger.IntRangeType, []*expression.Column{pkCol}, nil)
ts.Ranges = ranger.Ranges2IntRanges(ranges)
if err != nil {
return nil, errors.Trace(err)
}
} else {
ts.filterCondition = conds
}
}
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
if pkCol != nil {
rowCount, err = statsTbl.GetRowCountByIntColumnRanges(sc, pkCol.ID, ts.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
}
cost := rowCount * scanFactor
Expand Down
Loading