Skip to content

Commit

Permalink
Optimizer: Add partition by support for derived TopN(filter on row_nu…
Browse files Browse the repository at this point in the history
…mber)
  • Loading branch information
ghazalfamilyusa committed Feb 15, 2023
1 parent 55c8358 commit a10b52c
Show file tree
Hide file tree
Showing 10 changed files with 3,673 additions and 111 deletions.
3,253 changes: 3,253 additions & 0 deletions planner/core/diff

Large diffs are not rendered by default.

32 changes: 23 additions & 9 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2374,9 +2374,10 @@ func (lt *LogicalTopN) getPhysTopN(_ *property.PhysicalProperty) []PhysicalPlan
for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64}
topN := PhysicalTopN{
ByItems: lt.ByItems,
Count: lt.Count,
Offset: lt.Offset,
ByItems: lt.ByItems,
PartitionBy: lt.PartitionBy,
Count: lt.Count,
Offset: lt.Offset,
}.Init(lt.ctx, lt.stats, lt.blockOffset, resultProp)
ret = append(ret, topN)
}
Expand All @@ -2397,8 +2398,9 @@ func (lt *LogicalTopN) getPhysLimits(_ *property.PhysicalProperty) []PhysicalPla
for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), SortItems: p.SortItems}
limit := PhysicalLimit{
Count: lt.Count,
Offset: lt.Offset,
Count: lt.Count,
Offset: lt.Offset,
PartitionBy: lt.GetPartitionBy(),
}.Init(lt.ctx, lt.stats, lt.blockOffset, resultProp)
limit.SetSchema(lt.Schema())
ret = append(ret, limit)
Expand Down Expand Up @@ -2634,6 +2636,7 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo
switch c := ch.(type) {
case *DataSource:
validDs := false
partitionBy := false
considerIndexMerge := false
for _, path := range c.possibleAccessPaths {
if path.StoreType == storeTp {
Expand All @@ -2645,11 +2648,21 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo
}
ret = ret && validDs

_, isTopN := p.self.(*LogicalTopN)
_, isLimit := p.self.(*LogicalLimit)
topN, isTopN := p.self.(*LogicalTopN)
limit, isLimit := p.self.(*LogicalLimit)
if isTopN {
partitionBy = len(topN.GetPartitionBy()) != 0
}
if isLimit {
partitionBy = len(limit.GetPartitionBy()) != 0
}
if (isTopN || isLimit) && considerIndexMerge {
return false // TopN and Limit cannot be pushed down to IndexMerge
}
if (isTopN || isLimit) && storeTp == kv.TiFlash && partitionBy {
return false // Do not push limit/topN with partition by for TiFlash
}

if c.tableInfo.TableCacheStatusType != model.TableCacheStatusDisable {
// Don't push to cop for cached table, it brings more harm than good:
// 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation.
Expand Down Expand Up @@ -3112,8 +3125,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)}
limit := PhysicalLimit{
Offset: p.Offset,
Count: p.Count,
Offset: p.Offset,
Count: p.Count,
PartitionBy: p.GetPartitionBy(),
}.Init(p.ctx, p.stats, p.blockOffset, resultProp)
limit.SetSchema(p.Schema())
ret = append(ret, limit)
Expand Down
32 changes: 25 additions & 7 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,10 +1637,17 @@ func (ls *LogicalSort) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
type LogicalTopN struct {
baseLogicalPlan

ByItems []*util.ByItems
Offset uint64
Count uint64
limitHints limitHintInfo
ByItems []*util.ByItems
// PartitionBy is used for extended TopN to consider K heaps. Used by rule_derive_topn_from_window
PartitionBy []property.SortItem // This is used for enhanced topN optimization
Offset uint64
Count uint64
limitHints limitHintInfo
}

// GetPartitionBy returns partition by fields
func (lt *LogicalTopN) GetPartitionBy() []property.SortItem {
return lt.PartitionBy
}

// ExtractCorrelatedCols implements LogicalPlan interface.
Expand All @@ -1661,9 +1668,15 @@ func (lt *LogicalTopN) isLimit() bool {
type LogicalLimit struct {
logicalSchemaProducer

Offset uint64
Count uint64
limitHints limitHintInfo
PartitionBy []property.SortItem // This is used for enhanced topN optimization
Offset uint64
Count uint64
limitHints limitHintInfo
}

// GetPartitionBy returns partition by fields
func (lt *LogicalLimit) GetPartitionBy() []property.SortItem {
return lt.PartitionBy
}

// LogicalLock represents a select lock plan.
Expand Down Expand Up @@ -1734,6 +1747,11 @@ type LogicalWindow struct {
Frame *WindowFrame
}

// GetPartitionBy returns partition by fields.
func (p *LogicalWindow) GetPartitionBy() []property.SortItem {
return p.PartitionBy
}

// EqualPartitionBy checks whether two LogicalWindow.Partitions are equal.
func (p *LogicalWindow) EqualPartitionBy(_ sessionctx.Context, newWindow *LogicalWindow) bool {
if len(p.PartitionBy) != len(newWindow.PartitionBy) {
Expand Down
33 changes: 28 additions & 5 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,9 +1034,15 @@ func (p *PhysicalProjection) MemoryUsage() (sum int64) {
type PhysicalTopN struct {
basePhysicalPlan

ByItems []*util.ByItems
Offset uint64
Count uint64
ByItems []*util.ByItems
PartitionBy []property.SortItem
Offset uint64
Count uint64
}

// GetPartitionBy returns partition by fields
func (lt *PhysicalTopN) GetPartitionBy() []property.SortItem {
return lt.PartitionBy
}

// Clone implements PhysicalPlan interface.
Expand All @@ -1052,6 +1058,10 @@ func (lt *PhysicalTopN) Clone() (PhysicalPlan, error) {
for _, it := range lt.ByItems {
cloned.ByItems = append(cloned.ByItems, it.Clone())
}
cloned.PartitionBy = make([]property.SortItem, 0, len(lt.PartitionBy))
for _, it := range lt.PartitionBy {
cloned.PartitionBy = append(cloned.PartitionBy, it.Clone())
}
return cloned, nil
}

Expand All @@ -1074,6 +1084,9 @@ func (lt *PhysicalTopN) MemoryUsage() (sum int64) {
for _, byItem := range lt.ByItems {
sum += byItem.MemoryUsage()
}
for _, item := range lt.PartitionBy {
sum += item.MemoryUsage()
}
return
}

Expand Down Expand Up @@ -1585,8 +1598,14 @@ func (pl *PhysicalLock) MemoryUsage() (sum int64) {
type PhysicalLimit struct {
physicalSchemaProducer

Offset uint64
Count uint64
PartitionBy []property.SortItem
Offset uint64
Count uint64
}

// GetPartitionBy returns partition by fields
func (p *PhysicalLimit) GetPartitionBy() []property.SortItem {
return p.PartitionBy
}

// Clone implements PhysicalPlan interface.
Expand All @@ -1597,6 +1616,10 @@ func (p *PhysicalLimit) Clone() (PhysicalPlan, error) {
if err != nil {
return nil, err
}
cloned.PartitionBy = make([]property.SortItem, 0, len(p.PartitionBy))
for _, it := range p.PartitionBy {
cloned.PartitionBy = append(cloned.PartitionBy, it.Clone())
}
cloned.physicalSchemaProducer = *base
return cloned, nil
}
Expand Down
36 changes: 31 additions & 5 deletions planner/core/rule_derive_topn_from_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
)

// deriveTopNFromWindow pushes down the topN or limit. In the future we will remove the limit from `requiredProperty` in CBO phase.
Expand All @@ -38,13 +39,37 @@ func appendDerivedTopNTrace(topN LogicalPlan, opt *logicalOptimizeOp) {
opt.appendStepToCurrent(topN.ID(), topN.TP(), reason, action)
}

// checkPartitionBy mainly checks if partition by of window function is a prefix of
// data order (clustered index) of the data source. TiFlash is allowed only for empty partition by.
func checkPartitionBy(ctx sessionctx.Context, p *LogicalWindow, d *DataSource) bool {
// No window partition by. We are OK.
if len(p.PartitionBy) == 0 {
return true
}

// Table not clustered and window has partition by. Can not do the TopN piush down.
if d.handleCols == nil {
return false
}

if len(p.PartitionBy) > d.handleCols.NumCols() {
return false
}

for i, col := range p.PartitionBy {
if !(col.Col.Equal(nil, d.handleCols.GetCol(i))) {
return false
}
}
return true
}

/*
Check the following pattern of filter over row number window function:
- Filter is simple condition of row_number < value or row_number <= value
- The window function is a simple row number
- With default frame: rows between current row and current row. Check is not necessary since
current row is only frame applicable to row number
- No partition
- Child is a data source.
*/
func windowIsTopN(p *LogicalSelection) (bool, uint64) {
Expand All @@ -71,12 +96,13 @@ func windowIsTopN(p *LogicalSelection) (bool, uint64) {
}

grandChild := child.Children()[0]
_, isDataSource := grandChild.(*DataSource)
dataSource, isDataSource := grandChild.(*DataSource)
if !isDataSource {
return false, 0
}
if len(child.WindowFuncDescs) == 1 && child.WindowFuncDescs[0].Name == "row_number" && len(child.PartitionBy) == 0 &&
child.Frame.Type == ast.Rows && child.Frame.Start.Type == ast.CurrentRow && child.Frame.End.Type == ast.CurrentRow {
if len(child.WindowFuncDescs) == 1 && child.WindowFuncDescs[0].Name == "row_number" &&
child.Frame.Type == ast.Rows && child.Frame.Start.Type == ast.CurrentRow && child.Frame.End.Type == ast.CurrentRow &&
checkPartitionBy(p.ctx, child, dataSource) {
return true, uint64(limitValue)
}
return false, 0
Expand Down Expand Up @@ -107,7 +133,7 @@ func (s *LogicalSelection) deriveTopN(opt *logicalOptimizeOp) LogicalPlan {
byItems = append(byItems, &util.ByItems{Expr: col.Col, Desc: col.Desc})
}
// Build derived Limit
derivedTopN := LogicalTopN{Count: limitValue, ByItems: byItems}.Init(grandChild.ctx, grandChild.blockOffset)
derivedTopN := LogicalTopN{Count: limitValue, ByItems: byItems, PartitionBy: child.GetPartitionBy()}.Init(grandChild.ctx, grandChild.blockOffset)
derivedTopN.SetChildren(grandChild)
/* return datasource->topN->window */
child.SetChildren(derivedTopN)
Expand Down
73 changes: 62 additions & 11 deletions planner/core/rule_derive_topn_from_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,68 @@ package core_test
import (
"testing"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
"github.com/stretchr/testify/require"
)

// Rule should bot be applied
func setTiFlashReplica(t *testing.T, dom *domain.Domain, dbName, tableName string) {
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr(dbName))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == tableName {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
}

// Rule should bot be applied for TiKV.
func TestPushDerivedTopnNegative(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists employee")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1,1)")
tk.MustExec("insert into t values(2,1)")
tk.MustExec("insert into t values(3,2)")
tk.MustExec("insert into t values(4,2)")
tk.MustExec("insert into t values(5,2)")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, primary key(b,a))")
tk.MustExec("drop table if exists tt")
tk.MustExec("create table tt(a int, b int, c int, primary key(b,a) nonclustered)")
tk.MustExec("drop table if exists ti")
tk.MustExec("create table ti(a int, b int, c int unique)")
var input Input
var output []struct {
SQL string
Plan []string
}
suiteData := plannercore.GetDerivedTopNSuiteData()
suiteData.LoadTestCases(t, &input, &output)
for i, sql := range input {
plan := tk.MustQuery("explain format = 'brief' " + sql)
testdata.OnRecord(func() {
output[i].SQL = sql
output[i].Plan = testdata.ConvertRowsToStrings(plan.Rows())
})
plan.Check(testkit.Rows(output[i].Plan...))
}
}

// TiFlash cases. TopN pushed down to storage only when no partition by.
func TestPushDerivedTopnFlash(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
dom := domain.GetDomain(tk.Session())

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, primary key(b,a))")
SetTiFlashReplica(t, dom, "test", "t")
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustExec("set @@session.tidb_allow_mpp=ON;")
var input Input
var output []struct {
SQL string
Expand All @@ -51,18 +96,24 @@ func TestPushDerivedTopnNegative(t *testing.T) {
}
}

// Rule should be applied
// Rule should be applied for TiKV.
func TestPushDerivedTopnPositive(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists employee")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, primary key(b,a))")
tk.MustExec("drop table if exists tt")
tk.MustExec("create table tt(a int, b int, c int, primary key(b,a) nonclustered)")
tk.MustExec("drop table if exists ti")
tk.MustExec("create table ti(a int, b int, c int unique)")
tk.MustExec("insert into t values(1,1)")
tk.MustExec("insert into t values(2,1)")
tk.MustExec("insert into t values(3,2)")
tk.MustExec("insert into t values(4,2)")
tk.MustExec("insert into t values(5,2)")
tk.MustExec("insert into tt select *,55 from t")
tk.MustExec("insert into ti select *,a from t")
var input Input
var output []struct {
SQL string
Expand Down
7 changes: 4 additions & 3 deletions planner/core/rule_topn_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (lt *LogicalTopN) setChild(p LogicalPlan, opt *logicalOptimizeOp) LogicalPl

if lt.isLimit() {
limit := LogicalLimit{
Count: lt.Count,
Offset: lt.Offset,
limitHints: lt.limitHints,
Count: lt.Count,
Offset: lt.Offset,
limitHints: lt.limitHints,
PartitionBy: lt.GetPartitionBy(),
}.Init(lt.ctx, lt.blockOffset)
limit.SetChildren(p)
appendTopNPushDownTraceStep(limit, p, opt)
Expand Down
Loading

0 comments on commit a10b52c

Please sign in to comment.