Skip to content

Commit

Permalink
plan: improve row count estimation using column order correlation (#9839
Browse files Browse the repository at this point in the history
)
  • Loading branch information
eurekaka authored and zz-jason committed Apr 24, 2019
1 parent fa2d6f0 commit f9c82b5
Show file tree
Hide file tree
Showing 14 changed files with 350 additions and 65 deletions.
15 changes: 0 additions & 15 deletions cmd/explaintest/r/access_path_selection.result
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,3 @@ IndexLookUp_11 0.00 root
├─IndexScan_8 1.00 cop table:outdated_statistics, index:a, b, range:[1 1,1 1], keep order:false
└─Selection_10 0.00 cop eq(test.outdated_statistics.c, 1)
└─TableScan_9 1.00 cop table:outdated_statistics, keep order:false
CREATE TABLE `unknown_correlation` (
id int,
a int,
PRIMARY KEY (`id`),
INDEX idx_a(a)
);
INSERT INTO unknown_correlation values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 2),(21, 2),(22, 2),(23, 2),(24, 2),(25, 2);
ANALYZE TABLE unknown_correlation;
EXPLAIN SELECT * FROM unknown_correlation WHERE a = 2 ORDER BY id limit 1;
id count task operator info
Limit_11 1.00 root offset:0, count:1
└─TableReader_22 1.00 root data:Limit_21
└─Limit_21 1.00 cop offset:0, count:1
└─Selection_20 1.00 cop eq(test.unknown_correlation.a, 2)
└─TableScan_19 4.17 cop table:unknown_correlation, range:[-inf,+inf], keep order:true
19 changes: 0 additions & 19 deletions cmd/explaintest/t/access_path_selection.test
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,3 @@ insert into outdated_statistics values (1, 3, 3);
# result after Skyline Pruning is introduced.
analyze table outdated_statistics index idx_ab;
explain select * from outdated_statistics where a=1 and b=1 and c=1;

CREATE TABLE `unknown_correlation` (
id int,
a int,
PRIMARY KEY (`id`),
INDEX idx_a(a)
);
INSERT INTO unknown_correlation values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 2),(21, 2),(22, 2),(23, 2),(24, 2),(25, 2);
ANALYZE TABLE unknown_correlation;
# Estimated row count on idx_a is 6,
# while the estimated row count on TableScan is 4.17, which is computed as below:
# selectivity := ds.stats.RowCount / rowCount ==> selectivity = 6 / 25
# rowCount := prop.ExpectedCnt / selectivity ==> rowCount = 1 / (6 / 25) = 4.17
# Then the planner will pick TableScan instead of IndexScan to execute.
# But actually the cost of TableScan is 25-6+1=20 under this correlation between `id` and `a`.
# So IndexScan is better than TableScan for this query, but the planner do a wrong choice.
# This problem can be solved by introducing correlation between columns,
# so we may update this case later.
EXPLAIN SELECT * FROM unknown_correlation WHERE a = 2 ORDER BY id limit 1;
112 changes: 112 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,3 +987,115 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) {
}
c.Assert(hasIndexLookUp12, IsTrue)
}

func (s *testAnalyzeSuite) TestLimitCrossEstimation(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
defer func() {
dom.Close()
store.Close()
}()

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int not null, index idx_b(b))")
// Pseudo stats.
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 ORDER BY a limit 1;").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexReader_16 1.00 root index:TopN_15",
" └─TopN_15 1.00 cop test.t.a:asc, offset:0, count:1",
" └─IndexScan_14 10.00 cop table:t, index:b, range:[2,2], keep order:false, stats:pseudo",
))
// Positive correlation.
tk.MustExec("insert into t values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 2),(21, 2),(22, 2),(23, 2),(24, 2),(25, 2)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 ORDER BY a limit 1;").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexReader_16 1.00 root index:TopN_15",
" └─TopN_15 1.00 cop test.t.a:asc, offset:0, count:1",
" └─IndexScan_14 6.00 cop table:t, index:b, range:[2,2], keep order:false",
))
// Negative correlation.
tk.MustExec("truncate table t")
tk.MustExec("insert into t values (1, 25),(2, 24),(3, 23),(4, 23),(5, 21),(6, 20),(7, 19),(8, 18),(9, 17),(10, 16),(11, 15),(12, 14),(13, 13),(14, 12),(15, 11),(16, 10),(17, 9),(18, 8),(19, 7),(20, 6),(21, 5),(22, 4),(23, 3),(24, 2),(25, 1)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b <= 6 ORDER BY a limit 1").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexReader_16 1.00 root index:TopN_15",
" └─TopN_15 1.00 cop test.t.a:asc, offset:0, count:1",
" └─IndexScan_14 6.00 cop table:t, index:b, range:[-inf,6], keep order:false",
))
// Outer plan of index join (to test that correct column ID is used).
tk.MustQuery("EXPLAIN SELECT *, t1.a IN (SELECT t2.b FROM t t2) FROM t t1 WHERE t1.b <= 6 ORDER BY t1.a limit 1").Check(testkit.Rows(
"Limit_17 1.00 root offset:0, count:1",
"└─IndexJoin_58 1.00 root left outer semi join, inner:IndexReader_57, outer key:t1.a, inner key:t2.b",
" ├─TopN_23 1.00 root t1.a:asc, offset:0, count:1",
" │ └─IndexReader_31 1.00 root index:TopN_30",
" │ └─TopN_30 1.00 cop t1.a:asc, offset:0, count:1",
" │ └─IndexScan_29 6.00 cop table:t1, index:b, range:[-inf,6], keep order:false",
" └─IndexReader_57 1.04 root index:IndexScan_56",
" └─IndexScan_56 1.04 cop table:t2, index:b, range: decided by [t1.a], keep order:false",
))
// Desc TableScan.
tk.MustExec("truncate table t")
tk.MustExec("insert into t values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 2),(8, 2),(9, 2),(10, 2),(11, 2),(12, 2),(13, 2),(14, 2),(15, 2),(16, 2),(17, 2),(18, 2),(19, 2),(20, 2),(21, 2),(22, 2),(23, 2),(24, 2),(25, 2)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 1 ORDER BY a desc limit 1").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:desc, offset:0, count:1",
"└─IndexReader_16 1.00 root index:TopN_15",
" └─TopN_15 1.00 cop test.t.a:desc, offset:0, count:1",
" └─IndexScan_14 6.00 cop table:t, index:b, range:[1,1], keep order:false",
))
// Correlation threshold not met.
tk.MustExec("truncate table t")
tk.MustExec("insert into t values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 2),(10, 1),(11, 1),(12, 1),(13, 1),(14, 2),(15, 2),(16, 1),(17, 2),(18, 1),(19, 2),(20, 1),(21, 2),(22, 1),(23, 1),(24, 1),(25, 1)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 ORDER BY a limit 1").Check(testkit.Rows(
"Limit_11 1.00 root offset:0, count:1",
"└─TableReader_22 1.00 root data:Limit_21",
" └─Limit_21 1.00 cop offset:0, count:1",
" └─Selection_20 1.00 cop eq(test.t.b, 2)",
" └─TableScan_19 4.17 cop table:t, range:[-inf,+inf], keep order:true",
))
tk.MustExec("set @@tidb_opt_correlation_exp_factor = 1")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 ORDER BY a limit 1").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexReader_16 1.00 root index:TopN_15",
" └─TopN_15 1.00 cop test.t.a:asc, offset:0, count:1",
" └─IndexScan_14 6.00 cop table:t, index:b, range:[2,2], keep order:false",
))
tk.MustExec("set @@tidb_opt_correlation_exp_factor = 0")
// TableScan has access conditions, but correlation is 1.
tk.MustExec("truncate table t")
tk.MustExec("insert into t values (1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 2),(21, 2),(22, 2),(23, 2),(24, 2),(25, 2)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 and a > 0 ORDER BY a limit 1").Check(testkit.Rows(
"TopN_8 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexReader_19 1.00 root index:TopN_18",
" └─TopN_18 1.00 cop test.t.a:asc, offset:0, count:1",
" └─Selection_17 6.00 cop gt(test.t.a, 0)",
" └─IndexScan_16 6.00 cop table:t, index:b, range:[2,2], keep order:false",
))
// Multi-column filter.
tk.MustExec("drop table t")
tk.MustExec("create table t(a int primary key, b int, c int, index idx_b(b))")
tk.MustExec("insert into t values (1, 1, 1),(2, 1, 2),(3, 1, 1),(4, 1, 2),(5, 1, 1),(6, 1, 2),(7, 1, 1),(8, 1, 2),(9, 1, 1),(10, 1, 2),(11, 1, 1),(12, 1, 2),(13, 1, 1),(14, 1, 2),(15, 1, 1),(16, 1, 2),(17, 1, 1),(18, 1, 2),(19, 1, 1),(20, 2, 2),(21, 2, 1),(22, 2, 2),(23, 2, 1),(24, 2, 2),(25, 2, 1)")
tk.MustExec("analyze table t")
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 and c > 0 ORDER BY a limit 1").Check(testkit.Rows(
"TopN_9 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexLookUp_22 1.00 root ",
" ├─IndexScan_18 6.00 cop table:t, index:b, range:[2,2], keep order:false",
" └─TopN_21 1.00 cop test.t.a:asc, offset:0, count:1",
" └─Selection_20 6.00 cop gt(test.t.c, 0)",
" └─TableScan_19 6.00 cop table:t, keep order:false",
))
tk.MustQuery("EXPLAIN SELECT * FROM t WHERE b = 2 or c > 0 ORDER BY a limit 1").Check(testkit.Rows(
"Limit_11 1.00 root offset:0, count:1",
"└─TableReader_24 1.00 root data:Limit_23",
" └─Limit_23 1.00 cop offset:0, count:1",
" └─Selection_22 1.00 cop or(eq(test.t.b, 2), gt(test.t.c, 0))",
" └─TableScan_21 1.25 cop table:t, range:[-inf,+inf], keep order:true",
))
}
166 changes: 160 additions & 6 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"golang.org/x/tools/container/intsets"
)

Expand Down Expand Up @@ -617,6 +619,154 @@ func splitIndexFilterConditions(conditions []expression.Expression, indexColumns
return indexConditions, tableConditions
}

// getMostCorrColFromExprs checks if column in the condition is correlated enough with handle. If the condition
// contains multiple columns, choose the most correlated one, and compute an overall correlation factor by multiplying
// single factors.
func getMostCorrColFromExprs(exprs []expression.Expression, histColl *statistics.Table, threshold float64) (*expression.Column, float64) {
var cols []*expression.Column
cols = expression.ExtractColumnsFromExpressions(cols, exprs, nil)
if len(cols) == 0 {
return nil, 0
}
compCorr := 1.0
var corr float64
var corrCol *expression.Column
for _, col := range cols {
hist, ok := histColl.Columns[col.ID]
if !ok {
return nil, 0
}
curCorr := math.Abs(hist.Correlation)
compCorr *= curCorr
if curCorr < threshold {
continue
}
if corrCol == nil || corr < curCorr {
corrCol = col
corr = curCorr
}
}
return corrCol, compCorr
}

// getColumnRangeCounts estimates row count for each range respectively.
func getColumnRangeCounts(sc *stmtctx.StatementContext, colID int64, ranges []*ranger.Range, histColl *statistics.Table, idxID int64) ([]float64, bool) {
var err error
var count float64
rangeCounts := make([]float64, len(ranges))
for i, ran := range ranges {
if idxID >= 0 {
idxHist := histColl.Indices[idxID]
if idxHist == nil || idxHist.IsInvalid(false) {
return nil, false
}
count, err = histColl.GetRowCountByIndexRanges(sc, idxID, []*ranger.Range{ran})
} else {
colHist, ok := histColl.Columns[colID]
if !ok || colHist.IsInvalid(sc, false) {
return nil, false
}
count, err = histColl.GetRowCountByColumnRanges(sc, colID, []*ranger.Range{ran})
}
if err != nil {
return nil, false
}
rangeCounts[i] = count
}
return rangeCounts, true
}

// convertRangeFromExpectedCnt builds new ranges used to estimate row count we need to scan in table scan before finding specified
// number of tuples which fall into input ranges.
func convertRangeFromExpectedCnt(ranges []*ranger.Range, rangeCounts []float64, expectedCnt float64, desc bool) ([]*ranger.Range, float64, bool) {
var i int
var count float64
var convertedRanges []*ranger.Range
if desc {
for i = len(ranges) - 1; i >= 0; i-- {
if count+rangeCounts[i] >= expectedCnt {
break
}
count += rangeCounts[i]
}
if i < 0 {
return nil, 0, true
}
convertedRanges = []*ranger.Range{{LowVal: ranges[i].HighVal, HighVal: []types.Datum{types.MaxValueDatum()}, LowExclude: !ranges[i].HighExclude}}
} else {
for i = 0; i < len(ranges); i++ {
if count+rangeCounts[i] >= expectedCnt {
break
}
count += rangeCounts[i]
}
if i == len(ranges) {
return nil, 0, true
}
convertedRanges = []*ranger.Range{{LowVal: []types.Datum{{}}, HighVal: ranges[i].LowVal, HighExclude: !ranges[i].LowExclude}}
}
return convertedRanges, count, false
}

// crossEstimateRowCount estimates row count of table scan using histogram of another column which is in tableFilters
// and has high order correlation with handle column. For example, if the query is like:
// `select * from tbl where a = 1 order by pk limit 1`
// if order of column `a` is strictly correlated with column `pk`, the row count of table scan should be:
// `1 + row_count(a < 1 or a is null)`
func (ds *DataSource) crossEstimateRowCount(path *accessPath, expectedCnt float64, desc bool) (float64, bool, float64) {
if ds.statisticTable.Pseudo || len(path.tableFilters) == 0 {
return 0, false, 0
}
col, corr := getMostCorrColFromExprs(path.tableFilters, ds.statisticTable, ds.ctx.GetSessionVars().CorrelationThreshold)
// If table scan is not full range scan, we cannot use histogram of other columns for estimation, because
// the histogram reflects value distribution in the whole table level.
if col == nil || len(path.accessConds) > 0 {
return 0, false, corr
}
colInfoID := col.ID
colID := col.UniqueID
colHist := ds.statisticTable.Columns[colInfoID]
if colHist.Correlation < 0 {
desc = !desc
}
accessConds, remained := ranger.DetachCondsForColumn(ds.ctx, path.tableFilters, col)
if len(accessConds) == 0 {
return 0, false, corr
}
sc := ds.ctx.GetSessionVars().StmtCtx
ranges, err := ranger.BuildColumnRange(accessConds, sc, col.RetType)
if len(ranges) == 0 || err != nil {
return 0, err == nil, corr
}
idxID, idxExists := ds.stats.HistColl.ColID2IdxID[colID]
if !idxExists {
idxID = -1
}
rangeCounts, ok := getColumnRangeCounts(sc, colInfoID, ranges, ds.statisticTable, idxID)
if !ok {
return 0, false, corr
}
convertedRanges, count, isFull := convertRangeFromExpectedCnt(ranges, rangeCounts, expectedCnt, desc)
if isFull {
return path.countAfterAccess, true, 0
}
var rangeCount float64
if idxExists {
rangeCount, err = ds.statisticTable.GetRowCountByIndexRanges(sc, idxID, convertedRanges)
} else {
rangeCount, err = ds.statisticTable.GetRowCountByColumnRanges(sc, colInfoID, convertedRanges)
}
if err != nil {
return 0, false, corr
}
scanCount := rangeCount + expectedCnt - count
if len(remained) > 0 {
scanCount = scanCount / selectionFactor
}
scanCount = math.Min(scanCount, path.countAfterAccess)
return scanCount, true, 0
}

// convertToTableScan converts the DataSource to table scan.
func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candidate *candidatePath) (task task, err error) {
// It will be handled in convertToIndexScan.
Expand Down Expand Up @@ -651,12 +801,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
indexPlanFinished: true,
}
task = copTask
// Only use expectedCnt when it's smaller than the count we calculated.
// e.g. IndexScan(count1)->After Filter(count2). The `ds.stats.RowCount` is count2. count1 is the one we need to calculate
// If expectedCnt and count2 are both zero and we go into the below `if` block, the count1 will be set to zero though it's shouldn't be.
if (candidate.isMatchProp || prop.IsEmpty()) && prop.ExpectedCnt < ds.stats.RowCount {
selectivity := ds.stats.RowCount / rowCount
rowCount = math.Min(prop.ExpectedCnt/selectivity, rowCount)
// Adjust number of rows we actually need to scan if prop.ExpectedCnt is smaller than the count we calculated.
if prop.ExpectedCnt < ds.stats.RowCount {
count, ok, corr := ds.crossEstimateRowCount(path, prop.ExpectedCnt, candidate.isMatchProp && prop.Items[0].Desc)
if ok {
rowCount = count
} else if corr < 1 {
correlationFactor := math.Pow(1-corr, float64(ds.ctx.GetSessionVars().CorrelationExpFactor))
selectivity := ds.stats.RowCount / rowCount
rowCount = math.Min(prop.ExpectedCnt/selectivity/correlationFactor, rowCount)
}
}
ts.stats = property.NewSimpleStats(rowCount)
ts.stats.StatsVersion = ds.statisticTable.Version
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (ds *DataSource) deriveTablePathStats(path *accessPath) (bool, error) {
if len(ds.pushedDownConds) == 0 {
return false, nil
}
path.accessConds, path.tableFilters = ranger.DetachCondsForTableRange(ds.ctx, ds.pushedDownConds, pkCol)
path.accessConds, path.tableFilters = ranger.DetachCondsForColumn(ds.ctx, ds.pushedDownConds, pkCol)
// If there's no access cond, we try to find that whether there's expression containing correlated column that
// can be used to access data.
corColInAccessConds := false
Expand Down
2 changes: 2 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,8 @@ var builtinGlobalVariable = []string{
variable.TiDBDDLReorgBatchSize,
variable.TiDBDDLErrorCountLimit,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBOptCorrelationThreshold,
variable.TiDBOptCorrelationExpFactor,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
variable.TiDBMaxChunkSize,
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ type SessionVars struct {
// AllowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg.
AllowInSubqToJoinAndAgg bool

// CorrelationThreshold is the guard to enable row count estimation using column order correlation.
CorrelationThreshold float64

// CorrelationExpFactor is used to control the heuristic approach of row count estimation when CorrelationThreshold is not met.
CorrelationExpFactor int

// CurrInsertValues is used to record current ValuesExpr's values.
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
CurrInsertValues chunk.Row
Expand Down Expand Up @@ -386,6 +392,8 @@ func NewSessionVars() *SessionVars {
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
AllowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
CorrelationThreshold: DefOptCorrelationThreshold,
CorrelationExpFactor: DefOptCorrelationExpFactor,
EnableRadixJoin: false,
L2CacheSize: cpuid.CPU.Cache.L2,
CommandValue: uint32(mysql.ComSleep),
Expand Down Expand Up @@ -657,6 +665,10 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.AllowWriteRowID = TiDBOptOn(val)
case TiDBOptInSubqToJoinAndAgg:
s.AllowInSubqToJoinAndAgg = TiDBOptOn(val)
case TiDBOptCorrelationThreshold:
s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold)
case TiDBOptCorrelationExpFactor:
s.CorrelationExpFactor = tidbOptPositiveInt32(val, DefOptCorrelationExpFactor)
case TiDBIndexLookupConcurrency:
s.IndexLookupConcurrency = tidbOptPositiveInt32(val, DefIndexLookupConcurrency)
case TiDBIndexLookupJoinConcurrency:
Expand Down
Loading

0 comments on commit f9c82b5

Please sign in to comment.