Skip to content

Commit

Permalink
executor: fix CTE may be blocked when query report error (#33085)
Browse files Browse the repository at this point in the history
close #31302
  • Loading branch information
guo-shaoge authored Mar 17, 2022
1 parent 47e4b5b commit f12ad1e
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 20 deletions.
3 changes: 0 additions & 3 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (e *CTEExec) Close() (err error) {
func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
// This means iterInTbl's can be read.
defer close(e.iterInTbl.GetBegCh())
chks := make([]*chunk.Chunk, 0, 10)
for {
if e.limitDone(e.iterInTbl) {
Expand Down Expand Up @@ -384,7 +382,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) {
if err = e.iterInTbl.Reopen(); err != nil {
return err
}
defer close(e.iterInTbl.GetBegCh())
if e.isDistinct {
// Already deduplicated by resTbl, adding directly is ok.
for _, chk := range chks {
Expand Down
3 changes: 0 additions & 3 deletions executor/cte_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error {
func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()

// Wait until iterInTbl can be read. This is controlled by corresponding CTEExec.
<-e.iterInTbl.GetBegCh()

// We should read `iterInTbl` from the beginning when the next iteration starts.
// Can not directly judge whether to start the next iteration based on e.chkIdx,
// because some operators(Selection) may use forloop to read all data in `iterInTbl`.
Expand Down
32 changes: 32 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -408,3 +410,33 @@ func TestSpillToDisk(t *testing.T) {
}
rows.Check(testkit.Rows(resRows...))
}

func TestCTEExecError(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists src;")
tk.MustExec("create table src(first int, second int);")

insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000))
for i := 0; i < 1000; i++ {
insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000))
}
insertStr += ";"
tk.MustExec(insertStr)

// Increase projection concurrency and decrease chunk size
// to increase the probability of reproducing the problem.
tk.MustExec("set tidb_max_chunk_size = 32")
tk.MustExec("set tidb_projection_concurrency = 20")
for i := 0; i < 10; i++ {
err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " +
"(select 1, first, second, first+second from src " +
" union all " +
"select iter+1, second, result, second+result from cte where iter < 80 )" +
"select * from cte")
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
}
}
9 changes: 7 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type IndexNestedLoopHashJoin struct {
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
stats *indexLookUpJoinRuntimeStats
prepared bool
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -133,7 +134,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -207,6 +207,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {

// Next implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
Expand Down Expand Up @@ -299,6 +303,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type IndexLookUpJoin struct {

stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
prepared bool
}

type outerCtx struct {
Expand Down Expand Up @@ -174,7 +175,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -258,6 +258,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -764,6 +768,7 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
e.task = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type IndexLookUpMergeJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage
prepared bool
}

type outerMergeCtx struct {
Expand Down Expand Up @@ -162,7 +163,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -249,6 +249,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT

// Next implements the Executor interface
func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -731,5 +735,6 @@ func (e *IndexLookUpMergeJoin) Close() error {
// cancelFunc control the outer worker and outer worker close the task channel.
e.workerWg.Wait()
e.memTracker = nil
e.prepared = false
return e.baseExecutor.Close()
}
10 changes: 0 additions & 10 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ type Storage interface {
SetIter(iter int)
GetIter() int

// We use this channel to notify reader that Storage is ready to read.
// It exists only to solve the special implementation of IndexLookUpJoin.
// We will find a better way and remove this later.
GetBegCh() chan struct{}

GetMemTracker() *memory.Tracker
GetDiskTracker() *disk.Tracker
ActionSpill() *chunk.SpillDiskAction
Expand Down Expand Up @@ -239,11 +234,6 @@ func (s *StorageRC) GetIter() int {
return s.iter
}

// GetBegCh impls Storage GetBegCh interface.
func (s *StorageRC) GetBegCh() chan struct{} {
return s.begCh
}

// GetMemTracker impls Storage GetMemTracker interface.
func (s *StorageRC) GetMemTracker() *memory.Tracker {
return s.rc.GetMemTracker()
Expand Down

0 comments on commit f12ad1e

Please sign in to comment.