Skip to content

Commit

Permalink
planner, executor: enable inline projection for Limit (pingcap#20288)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu authored Oct 19, 2020
1 parent 937949f commit 4501f6d
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 77 deletions.
11 changes: 5 additions & 6 deletions cmd/explaintest/r/topn_push_down.result
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,11 @@ Apply_17 9990.00 root semi join, equal:[eq(test.t1.a, test.t2.a)]
│ └─Selection_19 9990.00 cop[tikv] not(isnull(test.t1.a))
│ └─TableFullScan_18 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Selection_21(Probe) 0.80 root not(isnull(test.t2.a))
└─Projection_22 1.00 root test.t2.a
└─Limit_23 1.00 root offset:0, count:1
└─TableReader_29 1.00 root data:Limit_28
└─Limit_28 1.00 cop[tikv] offset:0, count:1
└─Selection_27 1.00 cop[tikv] gt(test.t2.b, test.t1.b)
└─TableFullScan_26 1.25 cop[tikv] table:t2 keep order:false, stats:pseudo
└─Limit_23 1.00 root offset:0, count:1
└─TableReader_29 1.00 root data:Limit_28
└─Limit_28 1.00 cop[tikv] offset:0, count:1
└─Selection_27 1.00 cop[tikv] gt(test.t2.b, test.t1.b)
└─TableFullScan_26 1.25 cop[tikv] table:t2 keep order:false, stats:pseudo
drop table if exists t;
create table t(a int not null, index idx(a));
explain select /*+ TIDB_INLJ(t2) */ * from t t1 join t t2 on t1.a = t2.a limit 5;
Expand Down
117 changes: 117 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1709,3 +1709,120 @@ func BenchmarkSortExec(b *testing.B) {
})
}
}

type limitCase struct {
rows int
offset int
count int
childUsedSchema []bool
usingInlineProjection bool
ctx sessionctx.Context
}

func (tc limitCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

func (tc limitCase) String() string {
return fmt.Sprintf("(rows:%v, offset:%v, count:%v, inline_projection:%v)",
tc.rows, tc.offset, tc.count, tc.usingInlineProjection)
}

func defaultLimitTestCase() *limitCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
tc := &limitCase{
rows: 30000,
offset: 10000,
count: 10000,
childUsedSchema: []bool{false, true},
usingInlineProjection: false,
ctx: ctx,
}
return tc
}

func benchmarkLimitExec(b *testing.B, cas *limitCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
}
dataSource := buildMockDataSource(opt)
var exec Executor
limit := &LimitExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, 4, dataSource),
begin: uint64(cas.offset),
end: uint64(cas.offset + cas.count),
}
if cas.usingInlineProjection {
if len(cas.childUsedSchema) > 0 {
limit.columnIdxsUsedByChild = make([]int, 0, len(cas.childUsedSchema))
for i, used := range cas.childUsedSchema {
if used {
limit.columnIdxsUsedByChild = append(limit.columnIdxsUsedByChild, i)
}
}
}
exec = limit
} else {
columns := cas.columns()
usedCols := make([]*expression.Column, 0, len(columns))
exprs := make([]expression.Expression, 0, len(columns))
for i, used := range cas.childUsedSchema {
if used {
usedCols = append(usedCols, columns[i])
exprs = append(exprs, columns[i])
}
}
proj := &ProjectionExec{
baseExecutor: newBaseExecutor(cas.ctx, expression.NewSchema(usedCols...), 0, limit),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
exec = proj
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkLimitExec(b *testing.B) {
b.ReportAllocs()
cas := defaultLimitTestCase()
usingInlineProjection := []bool{false, true}
for _, inlineProjection := range usingInlineProjection {
cas.usingInlineProjection = inlineProjection
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkLimitExec(b, cas)
})
}
}
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,17 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
begin: v.Offset,
end: v.Offset + v.Count,
}

childUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, i)
}
}
if len(e.columnIdxsUsedByChild) == len(childUsedSchema) {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
}
return e
}

Expand Down
29 changes: 24 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,9 @@ type LimitExec struct {
meetFirstBatch bool

childResult *chunk.Chunk

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1025,26 +1028,42 @@ func (e *LimitExec) Next(ctx context.Context, req *chunk.Chunk) error {
if begin == end {
break
}
req.Append(e.childResult, int(begin), int(end))
if e.columnIdxsUsedByChild != nil {
req.Append(e.childResult.Prune(e.columnIdxsUsedByChild), int(begin), int(end))
} else {
req.Append(e.childResult, int(begin), int(end))
}
return nil
}
e.cursor += batchSize
}
e.adjustRequiredRows(req)
err := Next(ctx, e.children[0], req)
e.childResult.Reset()
e.childResult = e.childResult.SetRequiredRows(req.RequiredRows(), e.maxChunkSize)
e.adjustRequiredRows(e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
batchSize := uint64(req.NumRows())
batchSize := uint64(e.childResult.NumRows())
// no more data.
if batchSize == 0 {
return nil
}
if e.cursor+batchSize > e.end {
req.TruncateTo(int(e.end - e.cursor))
e.childResult.TruncateTo(int(e.end - e.cursor))
batchSize = e.end - e.cursor
}
e.cursor += batchSize

if e.columnIdxsUsedByChild != nil {
for i, childIdx := range e.columnIdxsUsedByChild {
if err = req.SwapColumn(i, e.childResult, childIdx); err != nil {
return err
}
}
} else {
req.SwapColumns(e.childResult)
}
return nil
}

Expand Down
76 changes: 59 additions & 17 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3691,44 +3691,86 @@ func (s *testSuite) TestLimit(c *C) {
tk.MustExec(`use test;`)
tk.MustExec(`drop table if exists t;`)
tk.MustExec(`create table t(a bigint, b bigint);`)
tk.MustExec(`insert into t values(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6);`)
tk.MustExec(`insert into t values(1, 1), (2, 2), (3, 30), (4, 40), (5, 5), (6, 6);`)
tk.MustQuery(`select * from t order by a limit 1, 1;`).Check(testkit.Rows(
"2 2",
))
tk.MustQuery(`select * from t order by a limit 1, 2;`).Check(testkit.Rows(
"2 2",
"3 3",
"3 30",
))
tk.MustQuery(`select * from t order by a limit 1, 3;`).Check(testkit.Rows(
"2 2",
"3 3",
"4 4",
"3 30",
"4 40",
))
tk.MustQuery(`select * from t order by a limit 1, 4;`).Check(testkit.Rows(
"2 2",
"3 3",
"4 4",
"3 30",
"4 40",
"5 5",
))

// test inline projection
tk.MustQuery(`select a from t where a > 0 limit 1, 1;`).Check(testkit.Rows(
"2",
))
tk.MustQuery(`select a from t where a > 0 limit 1, 2;`).Check(testkit.Rows(
"2",
"3",
))
tk.MustQuery(`select b from t where a > 0 limit 1, 3;`).Check(testkit.Rows(
"2",
"30",
"40",
))
tk.MustQuery(`select b from t where a > 0 limit 1, 4;`).Check(testkit.Rows(
"2",
"30",
"40",
"5",
))

// test @@tidb_init_chunk_size=2
tk.MustExec(`set @@tidb_init_chunk_size=2;`)
tk.MustQuery(`select * from t order by a limit 2, 1;`).Check(testkit.Rows(
"3 3",
tk.MustQuery(`select * from t where a > 0 limit 2, 1;`).Check(testkit.Rows(
"3 30",
))
tk.MustQuery(`select * from t order by a limit 2, 2;`).Check(testkit.Rows(
"3 3",
"4 4",
tk.MustQuery(`select * from t where a > 0 limit 2, 2;`).Check(testkit.Rows(
"3 30",
"4 40",
))
tk.MustQuery(`select * from t order by a limit 2, 3;`).Check(testkit.Rows(
"3 3",
"4 4",
tk.MustQuery(`select * from t where a > 0 limit 2, 3;`).Check(testkit.Rows(
"3 30",
"4 40",
"5 5",
))
tk.MustQuery(`select * from t order by a limit 2, 4;`).Check(testkit.Rows(
"3 3",
"4 4",
tk.MustQuery(`select * from t where a > 0 limit 2, 4;`).Check(testkit.Rows(
"3 30",
"4 40",
"5 5",
"6 6",
))

// test inline projection
tk.MustQuery(`select a from t order by a limit 2, 1;`).Check(testkit.Rows(
"3",
))
tk.MustQuery(`select b from t order by a limit 2, 2;`).Check(testkit.Rows(
"30",
"40",
))
tk.MustQuery(`select a from t order by a limit 2, 3;`).Check(testkit.Rows(
"3",
"4",
"5",
))
tk.MustQuery(`select b from t order by a limit 2, 4;`).Check(testkit.Rows(
"30",
"40",
"5",
"6",
))
}

func (s *testSuite) TestCoprocessorStreamingWarning(c *C) {
Expand Down
14 changes: 6 additions & 8 deletions planner/cascades/testdata/transformation_rules_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_5 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b,test.t.a]",
"Group#1 Schema:[test.t.b]",
" Projection_2 input:[Group#2], test.t.b, test.t.a",
"Group#2 Schema:[test.t.a,test.t.b]",
" TopN_7 input:[Group#3], test.t.a, offset:0, count:2",
Expand Down Expand Up @@ -384,7 +384,7 @@
"Result": [
"Group#0 Schema:[Column#14]",
" Projection_5 input:[Group#1], Column#13",
"Group#1 Schema:[Column#13,test.t.a]",
"Group#1 Schema:[Column#13]",
" Projection_2 input:[Group#2], plus(test.t.a, test.t.b)->Column#13, test.t.a",
"Group#2 Schema:[test.t.a,test.t.b]",
" TopN_7 input:[Group#3], test.t.a, offset:2, count:1",
Expand All @@ -401,7 +401,7 @@
"Result": [
"Group#0 Schema:[test.t.c]",
" Projection_5 input:[Group#1], test.t.c",
"Group#1 Schema:[test.t.c,test.t.a]",
"Group#1 Schema:[test.t.c]",
" Projection_2 input:[Group#2], test.t.c, test.t.a",
"Group#2 Schema:[test.t.a,test.t.c]",
" TopN_7 input:[Group#3], test.t.a, offset:0, count:1",
Expand All @@ -425,7 +425,7 @@
"Result": [
"Group#0 Schema:[test.t.c]",
" Projection_5 input:[Group#1], test.t.c",
"Group#1 Schema:[test.t.c,test.t.a,test.t.b]",
"Group#1 Schema:[test.t.c]",
" Projection_2 input:[Group#2], test.t.c, test.t.a, test.t.b",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c]",
" TopN_7 input:[Group#3], plus(test.t.a, test.t.b), offset:0, count:1",
Expand Down Expand Up @@ -473,7 +473,7 @@
" TableScan_15 table:t1, pk col:test.t.a",
"Group#3 Schema:[test.t.a]",
" Projection_9 input:[Group#5], test.t.a",
"Group#5 Schema:[test.t.a,Column#25]",
"Group#5 Schema:[test.t.a]",
" Projection_6 input:[Group#6], test.t.a, Column#25",
"Group#6 Schema:[test.t.a,Column#25]",
" Projection_5 input:[Group#7], test.t.a, test.t.b",
Expand Down Expand Up @@ -1645,10 +1645,8 @@
"SQL": "select a from (select a, b from t order by b limit 10) as t1",
"Result": [
"Group#0 Schema:[test.t.a]",
" Projection_5 input:[Group#1], test.t.a",
" TopN_6 input:[Group#1], test.t.b, offset:0, count:10",
"Group#1 Schema:[test.t.a,test.t.b]",
" TopN_6 input:[Group#2], test.t.b, offset:0, count:10",
"Group#2 Schema:[test.t.a,test.t.b]",
" DataSource_1 table:t"
]
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
Offset: p.Offset,
Count: p.Count,
}.Init(p.ctx, p.stats, p.blockOffset, resultProp)
limit.SetSchema(p.Schema())
ret = append(ret, limit)
}
return ret, true
Expand Down
Loading

0 comments on commit 4501f6d

Please sign in to comment.