Skip to content

Commit

Permalink
executor: add limit implementation for CTEExec (#24870)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jun 3, 2021
1 parent 83a9a12 commit 50b6da0
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 13 deletions.
87 changes: 81 additions & 6 deletions cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ insert into t2 values(1, 0), (2, 1);
explain with cte(a) as (select 1) select * from cte;
id estRows task access object operator info
CTEFullScan_8 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─Projection_6(Seed Part) 1.00 root 1->Column#1
└─TableDual_7 1.00 root rows:1
explain with cte(a) as (select c1 from t1) select * from cte;
id estRows task access object operator info
CTEFullScan_11 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─TableReader_8(Seed Part) 10000.00 root data:TableFullScan_7
└─TableFullScan_7 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain with cte(a,b,c,d) as (select * from t1, t2) select * from cte;
id estRows task access object operator info
CTEFullScan_18 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─HashJoin_10(Seed Part) 100000000.00 root CARTESIAN inner join
├─TableReader_17(Build) 10000.00 root data:TableFullScan_16
│ └─TableFullScan_16 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
Expand All @@ -46,7 +46,7 @@ CTE_0 1.00 root Recursive CTE
explain with cte(a) as (with recursive cte1(a) as (select 1 union select a + 1 from cte1 where a < 10) select * from cte1) select * from cte;
id estRows task access object operator info
CTEFullScan_21 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─CTEFullScan_20(Seed Part) 1.00 root CTE:cte1 data:CTE_1
CTE_1 1.00 root Recursive CTE
├─Projection_15(Seed Part) 1.00 root 1->Column#2
Expand All @@ -70,7 +70,7 @@ id estRows task access object operator info
HashJoin_17 1.00 root CARTESIAN inner join
├─CTEFullScan_27(Build) 1.00 root CTE:t2 data:CTE_0
└─CTEFullScan_26(Probe) 1.00 root CTE:t1 data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─CTEFullScan_25(Seed Part) 1.00 root CTE:cte1 data:CTE_1
CTE_1 1.00 root Recursive CTE
├─Projection_20(Seed Part) 1.00 root 1->Column#2
Expand Down Expand Up @@ -102,7 +102,7 @@ HashJoin_12 0.64 root CARTESIAN inner join
│ └─CTEFullScan_22 1.00 root CTE:q1 data:CTE_0
└─Selection_14(Probe) 0.80 root eq(test.t1.c1, 1)
└─CTEFullScan_20 1.00 root CTE:q data:CTE_0
CTE_0 1.00 root None Recursive CTE
CTE_0 1.00 root Non-Recursive CTE
└─TableReader_17(Seed Part) 10000.00 root data:TableFullScan_16
└─TableFullScan_16 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain with recursive cte(a,b) as (select 1, concat('a', 1) union select a+1, concat(b, 1) from cte where a < 5) select * from cte;
Expand All @@ -127,3 +127,78 @@ CTE_0 1.00 root Recursive CTE
└─Projection_27(Recursive Part) 0.80 root plus(Column#5, 1)->Column#7
└─Selection_28 0.80 root eq(Column#5, 0)
└─CTETable_29 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1) select * from cte1;
id estRows task access object operator info
CTEFullScan_19 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Recursive CTE, limit(offset:0, count:1)
├─TableReader_14(Seed Part) 10000.00 root data:TableFullScan_13
│ └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_17(Recursive Part) 1.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_18 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 100 offset 100) select * from cte1;
id estRows task access object operator info
CTEFullScan_19 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Recursive CTE, limit(offset:100, count:100)
├─TableReader_14(Seed Part) 10000.00 root data:TableFullScan_13
│ └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_17(Recursive Part) 1.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_18 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 0 offset 0) select * from cte1;
id estRows task access object operator info
CTEFullScan_19 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Recursive CTE, limit(offset:0, count:0)
├─TableReader_14(Seed Part) 10000.00 root data:TableFullScan_13
│ └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_17(Recursive Part) 1.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_18 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;
id estRows task access object operator info
HashJoin_18 0.64 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─Selection_29(Build) 0.80 root not(isnull(test.t1.c1))
│ └─CTEFullScan_30 1.00 root CTE:dt2 data:CTE_0
└─Selection_20(Probe) 0.80 root not(isnull(test.t1.c1))
└─CTEFullScan_28 1.00 root CTE:dt1 data:CTE_0
CTE_0 1.00 root Recursive CTE, limit(offset:0, count:1)
├─TableReader_23(Seed Part) 10000.00 root data:TableFullScan_22
│ └─TableFullScan_22 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_26(Recursive Part) 1.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_27 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 0 offset 0) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;
id estRows task access object operator info
HashJoin_18 0.64 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─Selection_29(Build) 0.80 root not(isnull(test.t1.c1))
│ └─CTEFullScan_30 1.00 root CTE:dt2 data:CTE_0
└─Selection_20(Probe) 0.80 root not(isnull(test.t1.c1))
└─CTEFullScan_28 1.00 root CTE:dt1 data:CTE_0
CTE_0 1.00 root Recursive CTE, limit(offset:0, count:0)
├─TableReader_23(Seed Part) 10000.00 root data:TableFullScan_22
│ └─TableFullScan_22 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_26(Recursive Part) 1.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_27 1.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 1) select * from cte1;
id estRows task access object operator info
CTEFullScan_34 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─Limit_21(Seed Part) 1.00 root offset:0, count:1
└─HashAgg_22 1.00 root group by:Column#11, funcs:firstrow(Column#11)->Column#11
└─Union_23 20000.00 root
├─TableReader_26 10000.00 root data:TableFullScan_25
│ └─TableFullScan_25 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─IndexReader_33 10000.00 root index:IndexFullScan_32
└─IndexFullScan_32 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 100 offset 100) select * from cte1;
id estRows task access object operator info
CTEFullScan_34 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─Limit_21(Seed Part) 100.00 root offset:100, count:100
└─HashAgg_22 200.00 root group by:Column#11, funcs:firstrow(Column#11)->Column#11
└─Union_23 20000.00 root
├─TableReader_26 10000.00 root data:TableFullScan_25
│ └─TableFullScan_25 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─IndexReader_33 10000.00 root index:IndexFullScan_32
└─IndexFullScan_32 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1;
id estRows task access object operator info
CTEFullScan_18 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─TableDual_17(Seed Part) 0.00 root rows:0
13 changes: 13 additions & 0 deletions cmd/explaintest/t/explain_cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,16 @@ explain with q(a,b) as (select * from t1) select /*+ merge(q) no_merge(q1) */ *
# explain with cte(a,b) as (select * from t1) select (select 1 from cte limit 1) from cte;
explain with recursive cte(a,b) as (select 1, concat('a', 1) union select a+1, concat(b, 1) from cte where a < 5) select * from cte;
explain select * from t1 dt where exists(with recursive qn as (select c1*0+1 as b union all select b+1 from qn where b=0) select * from qn where b=1);

# recursive limit
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 100 offset 100) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 0 offset 0) select * from cte1;

explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 0 offset 0) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;

# non-recursive limit
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 1) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 100 offset 100) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1;
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4192,6 +4192,9 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
chkIdx: 0,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
}
}

Expand Down
76 changes: 73 additions & 3 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ type CTEExec struct {
hCtx *hashContext
sel []int

// Limit related info.
hasLimit bool
limitBeg uint64
limitEnd uint64
cursor uint64
meetFirstBatch bool

memTracker *memory.Tracker
diskTracker *disk.Tracker
}
Expand Down Expand Up @@ -131,8 +138,8 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()
e.resTbl.Lock()
defer e.resTbl.Unlock()
if !e.resTbl.Done() {
defer e.resTbl.Unlock()
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)
iterOutAction := setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
Expand Down Expand Up @@ -160,10 +167,11 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return err
}
e.resTbl.SetDone()
} else {
e.resTbl.Unlock()
}

if e.hasLimit {
return e.nextChunkLimit(req)
}
if e.chkIdx < e.resTbl.NumChunks() {
res, err := e.resTbl.GetChunk(e.chkIdx)
if err != nil {
Expand Down Expand Up @@ -205,6 +213,9 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
defer close(e.iterInTbl.GetBegCh())
chks := make([]*chunk.Chunk, 0, 10)
for {
if e.limitDone(e.iterInTbl) {
break
}
chk := newFirstChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
Expand Down Expand Up @@ -239,6 +250,10 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter)
}

if e.limitDone(e.resTbl) {
return nil
}

for {
chk := newFirstChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
Expand All @@ -248,6 +263,9 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
if err = e.setupTblsForNewIteration(); err != nil {
return err
}
if e.limitDone(e.resTbl) {
break
}
if e.iterInTbl.NumChunks() == 0 {
break
}
Expand All @@ -274,6 +292,51 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
return nil
}

// Get next chunk from resTbl for limit.
func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error {
if !e.meetFirstBatch {
for e.chkIdx < e.resTbl.NumChunks() {
res, err := e.resTbl.GetChunk(e.chkIdx)
if err != nil {
return err
}
e.chkIdx++
numRows := uint64(res.NumRows())
if newCursor := e.cursor + numRows; newCursor >= e.limitBeg {
e.meetFirstBatch = true
begInChk, endInChk := e.limitBeg-e.cursor, numRows
if newCursor > e.limitEnd {
endInChk = e.limitEnd - e.cursor
}
e.cursor += endInChk
if begInChk == endInChk {
break
}
tmpChk := res.CopyConstructSel()
req.Append(tmpChk, int(begInChk), int(endInChk))
return nil
}
e.cursor += numRows
}
}
if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd {
res, err := e.resTbl.GetChunk(e.chkIdx)
if err != nil {
return err
}
e.chkIdx++
numRows := uint64(res.NumRows())
if e.cursor+numRows > e.limitEnd {
numRows = e.limitEnd - e.cursor
req.Append(res.CopyConstructSel(), 0, int(numRows)+1)
} else {
req.SwapColumns(res.CopyConstructSel())
}
e.cursor += numRows
}
return nil
}

func (e *CTEExec) setupTblsForNewIteration() (err error) {
num := e.iterOutTbl.NumChunks()
chks := make([]*chunk.Chunk, 0, num)
Expand Down Expand Up @@ -322,6 +385,8 @@ func (e *CTEExec) reset() {
e.curIter = 0
e.chkIdx = 0
e.hashTbl = nil
e.cursor = 0
e.meetFirstBatch = false
}

func (e *CTEExec) reopenTbls() (err error) {
Expand All @@ -332,6 +397,11 @@ func (e *CTEExec) reopenTbls() (err error) {
return e.iterInTbl.Reopen()
}

// Check if tbl meets the requirement of limit.
func (e *CTEExec) limitDone(tbl cteutil.Storage) bool {
return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd
}

func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker,
parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) {
memTracker := tbl.GetMemTracker()
Expand Down
Loading

0 comments on commit 50b6da0

Please sign in to comment.