From 606fe476ca115e9083c6cc8127201a7bb5fbc727 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:01:40 +0800 Subject: [PATCH 1/9] executor: fix cte may hang because register OOMAction repeatedly Signed-off-by: guo-shaoge --- executor/cte.go | 36 ++++++++++++++++++++++++++++++++---- executor/cte_test.go | 13 +++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 7e98064b1d8bd..8d9ee65683d11 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -30,6 +30,9 @@ import ( var _ Executor = &CTEExec{} +// Only for test. +var oomTestFlag = 0 + // CTEExec implements CTE. // Following diagram describes how CTEExec works. // @@ -156,6 +159,11 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return e.resTbl.Error() } resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + if oomTestFlag == 1 { + oomTestFlag = 2 + } + }) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) var iterOutAction *chunk.SpillDiskAction if e.iterOutTbl != nil { @@ -228,6 +236,15 @@ func (e *CTEExec) Close() (err error) { } func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + oomTestFlag = 1 + panic("testCTEPanic") + }) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) @@ -237,13 +254,13 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { } chk := newFirstChunk(e.seedExec) if err = Next(ctx, e.seedExec, chk); err != nil { - return err + return } if chk.NumRows() == 0 { break } if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { - return err + return } chks = append(chks, chk) } @@ -251,16 +268,21 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { // Just adding is ok. for _, chk := range chks { if err = e.resTbl.Add(chk); err != nil { - return err + return } } e.curIter++ e.iterInTbl.SetIter(e.curIter) - return nil + return } func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + err = errors.Errorf("%v", r) + } + }() if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { return nil } @@ -426,6 +448,12 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) + failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { + if oomTestFlag == 2 { + memTracker.Consume(1) + ctx.GetSessionVars().MemTracker.NeedKill.Store(true) + } + }) memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diff --git a/executor/cte_test.go b/executor/cte_test.go index 5f68f140fed5e..1b6dd27d6a07c 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -460,3 +460,16 @@ func TestCTEsInView(t *testing.T) { tk.MustExec("use test1;") tk.MustQuery("select * from test.v;").Check(testkit.Rows("1")) } + +func TestCTESeedPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(c1 int)") + tk.MustExec("insert into t1 values(1), (2), (3)") + + fpPathPrefix := "github.com/pingcap/tidb/executor/" + require.NoError(t, failpoint.Enable(fpPathPrefix + "testCTEPanic", "return(true)")) + tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) + require.NoError(t, failpoint.Disable(fpPathPrefix + "testCTEPanic")) +} From 6c14d04c2bae515b07f0639ccaed868e13f6112d Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:14:13 +0800 Subject: [PATCH 2/9] fix Close Signed-off-by: guo-shaoge --- util/cteutil/storage.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 5fdaf4424db8c..c09bb857def8d 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -131,13 +131,14 @@ func (s *StorageRC) DerefAndClose() (err error) { if s.refCnt < 0 { return errors.New("Storage ref count is less than zero") } else if s.refCnt == 0 { - // TODO: unreg memtracker + s.refCnt = -1 + s.done = false + s.err = nil + s.iter = 0 + s.rc = nil if err = s.rc.Close(); err != nil { return err } - if err = s.resetAll(); err != nil { - return err - } } return nil } @@ -157,7 +158,7 @@ func (s *StorageRC) SwapData(other Storage) (err error) { // Reopen impls Storage Reopen interface. func (s *StorageRC) Reopen() (err error) { - if err = s.rc.Reset(); err != nil { + if err = s.rc.Close(); err != nil { return err } s.iter = 0 @@ -267,18 +268,6 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction { return s.rc.ActionSpillForTest() } -func (s *StorageRC) resetAll() error { - s.refCnt = -1 - s.done = false - s.err = nil - s.iter = 0 - if err := s.rc.Reset(); err != nil { - return err - } - s.rc = nil - return nil -} - func (s *StorageRC) valid() bool { return s.refCnt > 0 && s.rc != nil } From e101ba6fb7e17a7dcee429dd74beb604a27cd5b8 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:14:59 +0800 Subject: [PATCH 3/9] fmt fmt Signed-off-by: guo-shaoge --- executor/cte_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/cte_test.go b/executor/cte_test.go index 1b6dd27d6a07c..4d903ae1f2637 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -469,7 +469,7 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix + "testCTEPanic", "return(true)")) + require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) - require.NoError(t, failpoint.Disable(fpPathPrefix + "testCTEPanic")) + require.NoError(t, failpoint.Disable(fpPathPrefix+"testCTEPanic")) } From 500dd784a00220506c1f8aedff91b9410cb1bc9a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:23:15 +0800 Subject: [PATCH 4/9] fix storage Signed-off-by: guo-shaoge --- executor/cte.go | 20 ++++---------------- util/cteutil/storage.go | 2 +- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 8d9ee65683d11..821ea5f4fda79 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -30,9 +30,6 @@ import ( var _ Executor = &CTEExec{} -// Only for test. -var oomTestFlag = 0 - // CTEExec implements CTE. // Following diagram describes how CTEExec works. // @@ -159,11 +156,6 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return e.resTbl.Error() } resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - if oomTestFlag == 1 { - oomTestFlag = 2 - } - }) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) var iterOutAction *chunk.SpillDiskAction if e.iterOutTbl != nil { @@ -242,8 +234,10 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { } }() failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - oomTestFlag = 1 - panic("testCTEPanic") + // panic("testCTEPanic") + for i := 0; i < 100; i++ { + e.memTracker.Consume(1 * 1024 * 1024 * 1024) + } }) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) @@ -448,12 +442,6 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - if oomTestFlag == 2 { - memTracker.Consume(1) - ctx.GetSessionVars().MemTracker.NeedKill.Store(true) - } - }) memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index c09bb857def8d..1e6109eeb5715 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -135,10 +135,10 @@ func (s *StorageRC) DerefAndClose() (err error) { s.done = false s.err = nil s.iter = 0 - s.rc = nil if err = s.rc.Close(); err != nil { return err } + s.rc = nil } return nil } From 1cb824198cef71cb39c5ec84d982095b8694d4ad Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:25:39 +0800 Subject: [PATCH 5/9] fix case Signed-off-by: guo-shaoge --- executor/cte_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/cte_test.go b/executor/cte_test.go index 4d903ae1f2637..3859a656255d9 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -470,6 +470,6 @@ func TestCTESeedPanic(t *testing.T) { fpPathPrefix := "github.com/pingcap/tidb/executor/" require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) - tk.MustQuery("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1").Check(testkit.Rows("1 2 3 4 5")) - require.NoError(t, failpoint.Disable(fpPathPrefix+"testCTEPanic")) + err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), "testCTEPanic") } From 03b4f5a85409f3a62294d9acb7e5bd906d310135 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:27:01 +0800 Subject: [PATCH 6/9] fix Signed-off-by: guo-shaoge --- executor/cte.go | 7 +------ executor/cte_test.go | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 821ea5f4fda79..b65548653bb02 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -233,12 +233,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() - failpoint.Inject("testCTEPanic", func(_ failpoint.Value) { - // panic("testCTEPanic") - for i := 0; i < 100; i++ { - e.memTracker.Consume(1 * 1024 * 1024 * 1024) - } - }) + failpoint.Inject("testCTEPanic", nil) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) diff --git a/executor/cte_test.go b/executor/cte_test.go index 3859a656255d9..0c703169570c3 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -469,7 +469,7 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", "return(true)")) + require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", `panic("testCTEPanic")`)) err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") require.Contains(t, err.Error(), "testCTEPanic") } From e664465448197d82dbbda1593c8cef623768db06 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:30:49 +0800 Subject: [PATCH 7/9] fix err Signed-off-by: guo-shaoge --- executor/cte.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index b65548653bb02..ff3121be362ab 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -273,7 +273,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { } }() if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { - return nil + return } if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { @@ -281,17 +281,17 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { } if e.limitDone(e.resTbl) { - return nil + return } for { chk := newFirstChunk(e.recursiveExec) if err = Next(ctx, e.recursiveExec, chk); err != nil { - return err + return } if chk.NumRows() == 0 { if err = e.setupTblsForNewIteration(); err != nil { - return err + return } if e.limitDone(e.resTbl) { break @@ -308,18 +308,18 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). if err = e.recursiveExec.Close(); err != nil { - return err + return } if err = e.recursiveExec.Open(ctx); err != nil { - return err + return } } else { if err = e.iterOutTbl.Add(chk); err != nil { - return err + return } } } - return nil + return } // Get next chunk from resTbl for limit. From f8763f0ed0a01d837a29fd2677e3783a4526489f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 12 May 2023 16:30:49 +0800 Subject: [PATCH 8/9] fix err Signed-off-by: guo-shaoge --- executor/cte.go | 3 ++- executor/cte_test.go | 14 +++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index ff3121be362ab..49848e4e75ce1 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -233,7 +233,7 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() - failpoint.Inject("testCTEPanic", nil) + failpoint.Inject("testCTESeedPanic", nil) e.curIter = 0 e.iterInTbl.SetIter(e.curIter) chks := make([]*chunk.Chunk, 0, 10) @@ -272,6 +272,7 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { err = errors.Errorf("%v", r) } }() + failpoint.Inject("testCTERecursivePanic", nil) if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { return } diff --git a/executor/cte_test.go b/executor/cte_test.go index 0c703169570c3..a166fabee02c4 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -461,7 +461,7 @@ func TestCTEsInView(t *testing.T) { tk.MustQuery("select * from test.v;").Check(testkit.Rows("1")) } -func TestCTESeedPanic(t *testing.T) { +func TestCTEPanic(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") @@ -469,7 +469,15 @@ func TestCTESeedPanic(t *testing.T) { tk.MustExec("insert into t1 values(1), (2), (3)") fpPathPrefix := "github.com/pingcap/tidb/executor/" - require.NoError(t, failpoint.Enable(fpPathPrefix+"testCTEPanic", `panic("testCTEPanic")`)) + fp := "testCTESeedPanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") - require.Contains(t, err.Error(), "testCTEPanic") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) + + fp = "testCTERecursivePanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) } From 1197885591957359dba0913883e8de55da040786 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 23 May 2023 19:39:44 +0800 Subject: [PATCH 9/9] fix case Signed-off-by: guo-shaoge --- executor/cte_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/cte_test.go b/executor/cte_test.go index a166fabee02c4..d7dd2c6f9ae3a 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -462,7 +462,8 @@ func TestCTEsInView(t *testing.T) { } func TestCTEPanic(t *testing.T) { - store := testkit.CreateMockStore(t) + store, clean := testkit.CreateMockStore(t) + defer clean() tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") tk.MustExec("create table t1(c1 int)")