diff --git a/executor/cte.go b/executor/cte.go index a9f083fa70479..7ee4a78dc417b 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -230,8 +230,6 @@ func (e *CTEExec) Close() (err error) { func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { e.curIter = 0 e.iterInTbl.SetIter(e.curIter) - // This means iterInTbl's can be read. - defer close(e.iterInTbl.GetBegCh()) chks := make([]*chunk.Chunk, 0, 10) for { if e.limitDone(e.iterInTbl) { @@ -384,7 +382,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { if err = e.iterInTbl.Reopen(); err != nil { return err } - defer close(e.iterInTbl.GetBegCh()) if e.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { diff --git a/executor/cte_table_reader.go b/executor/cte_table_reader.go index efd5a0387e6cb..4afd8aabbb79f 100644 --- a/executor/cte_table_reader.go +++ b/executor/cte_table_reader.go @@ -41,9 +41,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error { func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { req.Reset() - // Wait until iterInTbl can be read. This is controlled by corresponding CTEExec. - <-e.iterInTbl.GetBegCh() - // We should read `iterInTbl` from the beginning when the next iteration starts. // Can not directly judge whether to start the next iteration based on e.chkIdx, // because some operators(Selection) may use forloop to read all data in `iterInTbl`. diff --git a/executor/cte_test.go b/executor/cte_test.go index 52c2e957a2e21..bfc99c0ae89e6 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -22,7 +22,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" ) @@ -408,3 +410,33 @@ func TestSpillToDisk(t *testing.T) { } rows.Check(testkit.Rows(resRows...)) } + +func TestCTEExecError(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists src;") + tk.MustExec("create table src(first int, second int);") + + insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) + for i := 0; i < 1000; i++ { + insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) + } + insertStr += ";" + tk.MustExec(insertStr) + + // Increase projection concurrency and decrease chunk size + // to increase the probability of reproducing the problem. + tk.MustExec("set tidb_max_chunk_size = 32") + tk.MustExec("set tidb_projection_concurrency = 20") + for i := 0; i < 10; i++ { + err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + + "(select 1, first, second, first+second from src " + + " union all " + + "select iter+1, second, result, second+result from cte where iter < 80 )" + + "select * from cte") + require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) + } +} diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 4121b21ccd348..51f21c2491e77 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -70,7 +70,8 @@ type IndexNestedLoopHashJoin struct { // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask - stats *indexLookUpJoinRuntimeStats + stats *indexLookUpJoinRuntimeStats + prepared bool } type indexHashJoinOuterWorker struct { @@ -133,7 +134,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.finished.Store(false) - e.startWorkers(ctx) return nil } @@ -207,6 +207,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { // Next implements the IndexNestedLoopHashJoin Executor interface. func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } req.Reset() if e.keepOuterOrder { return e.runInOrder(ctx, req) @@ -299,6 +303,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.joinChkResourceCh = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 44a7bdb4b9ca1..994385566dde3 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -86,6 +86,7 @@ type IndexLookUpJoin struct { stats *indexLookUpJoinRuntimeStats finished *atomic.Value + prepared bool } type outerCtx struct { @@ -174,7 +175,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.cancelFunc = nil - e.startWorkers(ctx) return nil } @@ -258,6 +258,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -764,6 +768,7 @@ func (e *IndexLookUpJoin) Close() error { e.memTracker = nil e.task = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index db1c27a5dfdf7..d0ff14924e90a 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -73,6 +73,7 @@ type IndexLookUpMergeJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage + prepared bool } type outerMergeCtx struct { @@ -162,7 +163,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error { } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.startWorkers(ctx) return nil } @@ -249,6 +249,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT // Next implements the Executor interface func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -731,5 +735,6 @@ func (e *IndexLookUpMergeJoin) Close() error { // cancelFunc control the outer worker and outer worker close the task channel. e.workerWg.Wait() e.memTracker = nil + e.prepared = false return e.baseExecutor.Close() } diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 19b1bd5151fdc..a629398000898 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -82,11 +82,6 @@ type Storage interface { SetIter(iter int) GetIter() int - // We use this channel to notify reader that Storage is ready to read. - // It exists only to solve the special implementation of IndexLookUpJoin. - // We will find a better way and remove this later. - GetBegCh() chan struct{} - GetMemTracker() *memory.Tracker GetDiskTracker() *disk.Tracker ActionSpill() *chunk.SpillDiskAction @@ -239,11 +234,6 @@ func (s *StorageRC) GetIter() int { return s.iter } -// GetBegCh impls Storage GetBegCh interface. -func (s *StorageRC) GetBegCh() chan struct{} { - return s.begCh -} - // GetMemTracker impls Storage GetMemTracker interface. func (s *StorageRC) GetMemTracker() *memory.Tracker { return s.rc.GetMemTracker()