Skip to content

Commit

Permalink
executor: fix cte may hang because register OOMAction repeatedly (#43758
Browse files Browse the repository at this point in the history
)

close #36896, close #43749
  • Loading branch information
guo-shaoge authored May 12, 2023
1 parent a4926f8 commit cd334c4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 29 deletions.
36 changes: 24 additions & 12 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (e *CTEExec) Close() (err error) {
}

func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = errors.Errorf("%v", r)
}
}()
failpoint.Inject("testCTESeedPanic", nil)
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
chks := make([]*chunk.Chunk, 0, 10)
Expand All @@ -240,50 +246,56 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
}
chk := tryNewCacheChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
return
}
if chk.NumRows() == 0 {
break
}
if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil {
return err
return
}
chks = append(chks, chk)
}
// Initial resTbl is empty, so no need to deduplicate chk using resTbl.
// Just adding is ok.
for _, chk := range chks {
if err = e.resTbl.Add(chk); err != nil {
return err
return
}
}
e.curIter++
e.iterInTbl.SetIter(e.curIter)

return nil
return
}

func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = errors.Errorf("%v", r)
}
}()
failpoint.Inject("testCTERecursivePanic", nil)
if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 {
return nil
return
}

if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter)
}

if e.limitDone(e.resTbl) {
return nil
return
}

for {
chk := tryNewCacheChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
return
}
if chk.NumRows() == 0 {
if err = e.setupTblsForNewIteration(); err != nil {
return err
return
}
if e.limitDone(e.resTbl) {
break
Expand All @@ -300,18 +312,18 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
// Make sure iterInTbl is setup before Close/Open,
// because some executors will read iterInTbl in Open() (like IndexLookupJoin).
if err = e.recursiveExec.Close(); err != nil {
return err
return
}
if err = e.recursiveExec.Open(ctx); err != nil {
return err
return
}
} else {
if err = e.iterOutTbl.Add(chk); err != nil {
return err
return
}
}
}
return nil
return
}

// Get next chunk from resTbl for limit.
Expand Down
21 changes: 21 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,24 @@ func TestCTEsInView(t *testing.T) {
tk.MustExec("use test1;")
tk.MustQuery("select * from test.v;").Check(testkit.Rows("1"))
}

func TestCTEPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("create table t1(c1 int)")
tk.MustExec("insert into t1 values(1), (2), (3)")

fpPathPrefix := "github.com/pingcap/tidb/executor/"
fp := "testCTESeedPanic"
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))

fp = "testCTERecursivePanic"
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}
23 changes: 6 additions & 17 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ func (s *StorageRC) DerefAndClose() (err error) {
if s.refCnt < 0 {
return errors.New("Storage ref count is less than zero")
} else if s.refCnt == 0 {
// TODO: unreg memtracker
s.refCnt = -1
s.done = false
s.err = nil
s.iter = 0
if err = s.rc.Close(); err != nil {
return err
}
if err = s.resetAll(); err != nil {
return err
}
s.rc = nil
}
return nil
}
Expand All @@ -155,7 +156,7 @@ func (s *StorageRC) SwapData(other Storage) (err error) {

// Reopen impls Storage Reopen interface.
func (s *StorageRC) Reopen() (err error) {
if err = s.rc.Reset(); err != nil {
if err = s.rc.Close(); err != nil {
return err
}
s.iter = 0
Expand Down Expand Up @@ -265,18 +266,6 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
return s.rc.ActionSpillForTest()
}

func (s *StorageRC) resetAll() error {
s.refCnt = -1
s.done = false
s.err = nil
s.iter = 0
if err := s.rc.Reset(); err != nil {
return err
}
s.rc = nil
return nil
}

func (s *StorageRC) valid() bool {
return s.refCnt > 0 && s.rc != nil
}

0 comments on commit cd334c4

Please sign in to comment.