From 412e979a1ff5576f7c5436c0e02ce2f193aa95f4 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 17 Mar 2022 11:03:52 +0800 Subject: [PATCH] cherry pick #33085 to release-5.2 Signed-off-by: ti-srebot --- executor/cte.go | 3 - executor/cte_table_reader.go | 3 - executor/cte_test.go | 98 +++++++++++++++++++++++++++++ executor/index_lookup_hash_join.go | 9 ++- executor/index_lookup_join.go | 15 +++++ executor/index_lookup_merge_join.go | 7 ++- util/cteutil/storage.go | 10 --- 7 files changed, 126 insertions(+), 19 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 4fe3414b97154..6d1d79f8c1da8 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -214,8 +214,6 @@ func (e *CTEExec) Close() (err error) { func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { e.curIter = 0 e.iterInTbl.SetIter(e.curIter) - // This means iterInTbl's can be read. - defer close(e.iterInTbl.GetBegCh()) chks := make([]*chunk.Chunk, 0, 10) for { if e.limitDone(e.iterInTbl) { @@ -368,7 +366,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { if err = e.iterInTbl.Reopen(); err != nil { return err } - defer close(e.iterInTbl.GetBegCh()) if e.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { diff --git a/executor/cte_table_reader.go b/executor/cte_table_reader.go index 94fedf01fd93e..898bcaf9353c2 100644 --- a/executor/cte_table_reader.go +++ b/executor/cte_table_reader.go @@ -40,9 +40,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error { func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { req.Reset() - // Wait until iterInTbl can be read. This is controlled by corresponding CTEExec. - <-e.iterInTbl.GetBegCh() - // We should read `iterInTbl` from the beginning when the next iteration starts. // Can not directly judge whether to start the next iteration based on e.chkIdx, // because some operators(Selection) may use forloop to read all data in `iterInTbl`. diff --git a/executor/cte_test.go b/executor/cte_test.go index d5ec86a03bb05..a80931ffea366 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -28,9 +28,16 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" +<<<<<<< HEAD "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" +======= + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" +>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085) ) var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}}) @@ -448,3 +455,94 @@ func (test *CTETestSuite) TestCTEWithLimit(c *check.C) { rows = tk.MustQuery("with recursive cte1(c1) as (select c1 from t1 union all select c1 + 1 from cte1 limit 4 offset 4) select * from cte1;") rows.Check(testkit.Rows("3", "4", "3", "4")) } +<<<<<<< HEAD +======= + +func TestSpillToDisk(t *testing.T) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMUseTmpStorage = true + }) + + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill")) + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill")) + }() + + // Use duplicated rows to test UNION DISTINCT. + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + insertStr := "insert into t1 values(0)" + rowNum := 1000 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec(insertStr) + tk.MustExec("set tidb_mem_quota_query = 40000;") + tk.MustExec("set cte_max_recursion_depth = 500000;") + sql := fmt.Sprintf("with recursive cte1 as ( "+ + "select c1 from t1 "+ + "union "+ + "select c1 + 1 c1 from cte1 where c1 < %d) "+ + "select c1 from cte1 order by c1;", rowNum) + rows := tk.MustQuery(sql) + + memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker + diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker + require.Greater(t, memTracker.MaxConsumed(), int64(0)) + require.Greater(t, diskTracker.MaxConsumed(), int64(0)) + + sort.Ints(vals) + resRows := make([]string, 0, rowNum) + for i := vals[0]; i <= rowNum; i++ { + resRows = append(resRows, fmt.Sprintf("%d", i)) + } + rows.Check(testkit.Rows(resRows...)) +} + +func TestCTEExecError(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists src;") + tk.MustExec("create table src(first int, second int);") + + insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) + for i := 0; i < 1000; i++ { + insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) + } + insertStr += ";" + tk.MustExec(insertStr) + + // Increase projection concurrency and decrease chunk size + // to increase the probability of reproducing the problem. + tk.MustExec("set tidb_max_chunk_size = 32") + tk.MustExec("set tidb_projection_concurrency = 20") + for i := 0; i < 10; i++ { + err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + + "(select 1, first, second, first+second from src " + + " union all " + + "select iter+1, second, result, second+result from cte where iter < 80 )" + + "select * from cte") + require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) + } +} +>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index dfede97ee03c6..2c9debfde4544 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -69,7 +69,8 @@ type IndexNestedLoopHashJoin struct { // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask - stats *indexLookUpJoinRuntimeStats + stats *indexLookUpJoinRuntimeStats + prepared bool } type indexHashJoinOuterWorker struct { @@ -153,7 +154,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.finished.Store(false) - e.startWorkers(ctx) return nil } @@ -230,6 +230,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { // Next implements the IndexNestedLoopHashJoin Executor interface. func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } req.Reset() if e.keepOuterOrder { return e.runInOrder(ctx, req) @@ -329,6 +333,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.joinChkResourceCh = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index db0ec2c3756d8..ba4ec47d41a67 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -82,9 +82,15 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. +<<<<<<< HEAD stats *indexLookUpJoinRuntimeStats ctxCancelReason atomic.Value finished *atomic.Value +======= + stats *indexLookUpJoinRuntimeStats + finished *atomic.Value + prepared bool +>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085) } type outerCtx struct { @@ -168,7 +174,11 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } +<<<<<<< HEAD e.startWorkers(ctx) +======= + e.cancelFunc = nil +>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085) return nil } @@ -252,6 +262,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -771,6 +785,7 @@ func (e *IndexLookUpJoin) Close() error { e.memTracker = nil e.task = nil e.finished.Store(false) + e.prepared = false return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 83133eb1f6a00..2b23f171ecdf1 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -71,6 +71,7 @@ type IndexLookUpMergeJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage + prepared bool } type outerMergeCtx struct { @@ -181,7 +182,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error { } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.startWorkers(ctx) return nil } @@ -268,6 +268,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT // Next implements the Executor interface func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if !e.prepared { + e.startWorkers(ctx) + e.prepared = true + } if e.isOuterJoin { atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) } @@ -750,5 +754,6 @@ func (e *IndexLookUpMergeJoin) Close() error { // cancelFunc control the outer worker and outer worker close the task channel. e.workerWg.Wait() e.memTracker = nil + e.prepared = false return e.baseExecutor.Close() } diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index d2607892db62c..c9da934465c35 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -81,11 +81,6 @@ type Storage interface { SetIter(iter int) GetIter() int - // We use this channel to notify reader that Storage is ready to read. - // It exists only to solve the special implementation of IndexLookUpJoin. - // We will find a better way and remove this later. - GetBegCh() chan struct{} - GetMemTracker() *memory.Tracker GetDiskTracker() *disk.Tracker ActionSpill() *chunk.SpillDiskAction @@ -238,11 +233,6 @@ func (s *StorageRC) GetIter() int { return s.iter } -// GetBegCh impls Storage GetBegCh interface. -func (s *StorageRC) GetBegCh() chan struct{} { - return s.begCh -} - // GetMemTracker impls Storage GetMemTracker interface. func (s *StorageRC) GetMemTracker() *memory.Tracker { return s.rc.GetMemTracker()