From d9717825a60e5cbb4b6d7137992f9d9b8df55835 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:01:40 +0800 Subject: [PATCH 1/8] executor: fix cte may hang because register OOMAction repeatedly Signed-off-by: guo-shaoge --- executor/cte.go | 36 ++++++++++++++++++++++++++++++++---- executor/cte_test.go | 13 +++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 5530ffc560f8a..65b32d315efb4 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -30,6 +30,9 @@ import ( var _ Executor = &CTEExec{} +// Only for test. +var oomTestFlag = 0 + // CTEExec implements CTE. // Following diagram describes how CTEExec works. // @@ -159,6 +162,11 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return e.resTbl.Error() } resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + if oomTestFlag == 1 { + oomTestFlag = 2 + } + }) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) var iterOutAction *chunk.SpillDiskAction if e.iterOutTbl != nil { @@ -230,6 +238,15 @@ func (e *CTEExec) Close() (err error) { } func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + oomTestFlag = 1 + panic("testCTEPanic") + }) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) @@ -239,13 +256,13 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { } chk := tryNewCacheChunk(e.seedExec) if err = Next(ctx, e.seedExec, chk); err != nil { - return err + return } if chk.NumRows() == 0 { break } if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { - return err + return } chks = append(chks, chk) } @@ -253,16 +270,21 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { // Just adding is ok. for _, chk := range chks { if err = e.resTbl.Add(chk); err != nil { - return err + return } } e.curIter++ e.iterInTbl.SetIter(e.curIter) - return nil + return } func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { return nil } @@ -428,6 +450,12 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + if oomTestFlag == 2 { + memTracker.Consume(1) + ctx.GetSessionVars().MemTracker.NeedKill.Store(true) + } + }) memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diff --git a/executor/cte_test.go b/executor/cte_test.go index 368d4bfd07796..81ed7b08d9a40 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -449,3 +449,16 @@ func TestCTEsInView(t *testing.T) { tk.MustExec("use test1;") tk.MustQuery("select * from test.v;").Check(testkit.Rows("1")) } + +func TestCTESeedPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(c1 int)") + tk.MustExec("insert into t1 values(1), (2), (3)") + + fpPathPrefix := "github.com/pingcap/tidb/executor/" + require.NoError(t, failpoint.Enable(fpPathPrefix + "testCTEPanic", "return(true)")) + tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) + require.NoError(t, failpoint.Disable(fpPathPrefix + "testCTEPanic")) +} From b935b24c18e1ca7d44607cc226b6174c4d34f518 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:14:13 +0800 Subject: [PATCH 2/8] fix Close Signed-off-by: guo-shaoge --- util/cteutil/storage.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 02d82cef9e660..9583b901d8e64 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -129,13 +129,14 @@ func (s *StorageRC) DerefAndClose() (err error) { if s.refCnt < 0 { return errors.New("Storage ref count is less than zero") } else if s.refCnt == 0 { - // TODO: unreg memtracker + s.refCnt = -1 + s.done = false + s.err = nil + s.iter = 0 + s.rc = nil if err = s.rc.Close(); err != nil { return err } - if err = s.resetAll(); err != nil { - return err - } } return nil } @@ -155,7 +156,7 @@ func (s *StorageRC) SwapData(other Storage) (err error) { // Reopen impls Storage Reopen interface. func (s *StorageRC) Reopen() (err error) { - if err = s.rc.Reset(); err != nil { + if err = s.rc.Close(); err != nil { return err } s.iter = 0 @@ -265,18 +266,6 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction { return s.rc.ActionSpillForTest() } -func (s *StorageRC) resetAll() error { - s.refCnt = -1 - s.done = false - s.err = nil - s.iter = 0 - if err := s.rc.Reset(); err != nil { - return err - } - s.rc = nil - return nil -} - func (s *StorageRC) valid() bool { return s.refCnt > 0 && s.rc != nil } From 3b6b2ba3939f450cbf2b0db78772c4c4a7e1607a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:14:59 +0800 Subject: [PATCH 3/8] fmt fmt Signed-off-by: guo-shaoge --- executor/cte_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/cte_test.go b/executor/cte_test.go index 81ed7b08d9a40..8fe3f203455d7 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -458,7 +458,7 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix + "testCTEPanic", "return(true)")) + require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) - require.NoError(t, failpoint.Disable(fpPathPrefix + "testCTEPanic")) + require.NoError(t, failpoint.Disable(fpPathPrefix+"testCTEPanic")) } From 91e0e9b75000effb4c0535d498856d387035196e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:23:15 +0800 Subject: [PATCH 4/8] fix storage Signed-off-by: guo-shaoge --- executor/cte.go | 20 ++++---------------- util/cteutil/storage.go | 2 +- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 65b32d315efb4..613b09cf0790d 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -30,9 +30,6 @@ import ( var _ Executor = &CTEExec{} -// Only for test. -var oomTestFlag = 0 - // CTEExec implements CTE. // Following diagram describes how CTEExec works. // @@ -162,11 +159,6 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return e.resTbl.Error() } resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - if oomTestFlag == 1 { - oomTestFlag = 2 - } - }) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) var iterOutAction *chunk.SpillDiskAction if e.iterOutTbl != nil { @@ -244,8 +236,10 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { } }() failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - oomTestFlag = 1 - panic("testCTEPanic") + // panic("testCTEPanic") + for i := 0; i < 100; i++ { + e.memTracker.Consume(1 * 1024 * 1024 * 1024) + } }) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) @@ -450,12 +444,6 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - if oomTestFlag == 2 { - memTracker.Consume(1) - ctx.GetSessionVars().MemTracker.NeedKill.Store(true) - } - }) memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 9583b901d8e64..dea6fd632e42b 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -133,10 +133,10 @@ func (s *StorageRC) DerefAndClose() (err error) { s.done = false s.err = nil s.iter = 0 - s.rc = nil if err = s.rc.Close(); err != nil { return err } + s.rc = nil } return nil } From adc04b5a50a887c36ea4537c850020cf0431af2c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:25:39 +0800 Subject: [PATCH 5/8] fix case Signed-off-by: guo-shaoge --- executor/cte_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/cte_test.go b/executor/cte_test.go index 8fe3f203455d7..0b9c33d32ceac 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -459,6 +459,6 @@ func TestCTESeedPanic(t *testing.T) { fpPathPrefix := "github.com/pingcap/tidb/executor/" require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) - tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) - require.NoError(t, failpoint.Disable(fpPathPrefix+"testCTEPanic")) + err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), "testCTEPanic") } From 63c6c0eaff8dc22eb1810cb5c01ed70e9715ca7c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:27:01 +0800 Subject: [PATCH 6/8] fix Signed-off-by: guo-shaoge --- executor/cte.go | 7 +------ executor/cte_test.go | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 613b09cf0790d..c79036f6c5ac2 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -235,12 +235,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - // panic("testCTEPanic") - for i := 0; i < 100; i++ { - e.memTracker.Consume(1 * 1024 * 1024 * 1024) - } - }) + failpoint.Inject("testCTEPanic", nil) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) diff --git a/executor/cte_test.go b/executor/cte_test.go index 0b9c33d32ceac..9cd62aadb24ba 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -458,7 +458,7 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) + require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", `panic("testCTEPanic")`)) err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") require.Contains(t, err.Error(), "testCTEPanic") } From 2991fdb5bd01688946271dbb4b403ab482a2a57e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:30:49 +0800 Subject: [PATCH 7/8] fix err Signed-off-by: guo-shaoge --- executor/cte.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index c79036f6c5ac2..04d39646e95a6 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -275,7 +275,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { } }() if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { - return nil + return } if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { @@ -283,17 +283,17 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { } if e.limitDone(e.resTbl) { - return nil + return } for { chk := tryNewCacheChunk(e.recursiveExec) if err = Next(ctx, e.recursiveExec, chk); err != nil { - return err + return } if chk.NumRows() == 0 { if err = e.setupTblsForNewIteration(); err != nil { - return err + return } if e.limitDone(e.resTbl) { break @@ -310,18 +310,18 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). if err = e.recursiveExec.Close(); err != nil { - return err + return } if err = e.recursiveExec.Open(ctx); err != nil { - return err + return } } else { if err = e.iterOutTbl.Add(chk); err != nil { - return err + return } } } - return nil + return } // Get next chunk from resTbl for limit. From 687894d8400da56fc20eb800f59ec6cf4922469a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:30:49 +0800 Subject: [PATCH 8/8] fix err Signed-off-by: guo-shaoge --- executor/cte.go | 3 ++- executor/cte_test.go | 14 +++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 04d39646e95a6..07fa23a0a10e7 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -235,7 +235,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() - failpoint.Inject("testCTEPanic", nil) + failpoint.Inject("testCTESeedPanic", nil) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) @@ -274,6 +274,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() + failpoint.Inject("testCTERecursivePanic", nil) if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { return } diff --git a/executor/cte_test.go b/executor/cte_test.go index 9cd62aadb24ba..9d4214dce9438 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -450,7 +450,7 @@ func TestCTEsInView(t *testing.T) { tk.MustQuery("select * from test.v;").Check(testkit.Rows("1")) } -func TestCTESeedPanic(t *testing.T) { +func TestCTEPanic(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") @@ -458,7 +458,15 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", `panic("testCTEPanic")`)) + fp := "testCTESeedPanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") - require.Contains(t, err.Error(), "testCTEPanic") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) + + fp = "testCTERecursivePanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) }