Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix CTE may be blocked when query report error (#33085) #33187

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ func (e *CTEExec) Close() (err error) {
func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
// This means iterInTbl's can be read.
defer close(e.iterInTbl.GetBegCh())
chks := make([]*chunk.Chunk, 0, 10)
for {
if e.limitDone(e.iterInTbl) {
Expand Down Expand Up @@ -368,7 +366,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) {
if err = e.iterInTbl.Reopen(); err != nil {
return err
}
defer close(e.iterInTbl.GetBegCh())
if e.isDistinct {
// Already deduplicated by resTbl, adding directly is ok.
for _, chk := range chks {
Expand Down
3 changes: 0 additions & 3 deletions executor/cte_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error {
func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()

// Wait until iterInTbl can be read. This is controlled by corresponding CTEExec.
<-e.iterInTbl.GetBegCh()

// We should read `iterInTbl` from the beginning when the next iteration starts.
// Can not directly judge whether to start the next iteration based on e.chkIdx,
// because some operators(Selection) may use forloop to read all data in `iterInTbl`.
Expand Down
98 changes: 98 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
<<<<<<< HEAD
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
=======
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
)

var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}})
Expand Down Expand Up @@ -448,3 +455,94 @@ func (test *CTETestSuite) TestCTEWithLimit(c *check.C) {
rows = tk.MustQuery("with recursive cte1(c1) as (select c1 from t1 union all select c1 + 1 from cte1 limit 4 offset 4) select * from cte1;")
rows.Check(testkit.Rows("3", "4", "3", "4"))
}
<<<<<<< HEAD
=======

func TestSpillToDisk(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
})

store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill"))
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"))
}()

// Use duplicated rows to test UNION DISTINCT.
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
insertStr := "insert into t1 values(0)"
rowNum := 1000
vals := make([]int, rowNum)
vals[0] = 0
for i := 1; i < rowNum; i++ {
v := rand.Intn(100)
vals[i] = v
insertStr += fmt.Sprintf(", (%d)", v)
}
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int);")
tk.MustExec(insertStr)
tk.MustExec("set tidb_mem_quota_query = 40000;")
tk.MustExec("set cte_max_recursion_depth = 500000;")
sql := fmt.Sprintf("with recursive cte1 as ( "+
"select c1 from t1 "+
"union "+
"select c1 + 1 c1 from cte1 where c1 < %d) "+
"select c1 from cte1 order by c1;", rowNum)
rows := tk.MustQuery(sql)

memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker
diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker
require.Greater(t, memTracker.MaxConsumed(), int64(0))
require.Greater(t, diskTracker.MaxConsumed(), int64(0))

sort.Ints(vals)
resRows := make([]string, 0, rowNum)
for i := vals[0]; i <= rowNum; i++ {
resRows = append(resRows, fmt.Sprintf("%d", i))
}
rows.Check(testkit.Rows(resRows...))
}

func TestCTEExecError(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists src;")
tk.MustExec("create table src(first int, second int);")

insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000))
for i := 0; i < 1000; i++ {
insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000))
}
insertStr += ";"
tk.MustExec(insertStr)

// Increase projection concurrency and decrease chunk size
// to increase the probability of reproducing the problem.
tk.MustExec("set tidb_max_chunk_size = 32")
tk.MustExec("set tidb_projection_concurrency = 20")
for i := 0; i < 10; i++ {
err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " +
"(select 1, first, second, first+second from src " +
" union all " +
"select iter+1, second, result, second+result from cte where iter < 80 )" +
"select * from cte")
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
}
}
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
9 changes: 7 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type IndexNestedLoopHashJoin struct {
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
stats *indexLookUpJoinRuntimeStats
prepared bool
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -153,7 +154,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -230,6 +230,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {

// Next implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
Expand Down Expand Up @@ -329,6 +333,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
15 changes: 15 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ type IndexLookUpJoin struct {

memTracker *memory.Tracker // track memory usage.

<<<<<<< HEAD
stats *indexLookUpJoinRuntimeStats
ctxCancelReason atomic.Value
finished *atomic.Value
=======
stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
prepared bool
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
}

type outerCtx struct {
Expand Down Expand Up @@ -168,7 +174,11 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
<<<<<<< HEAD
e.startWorkers(ctx)
=======
e.cancelFunc = nil
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
return nil
}

Expand Down Expand Up @@ -252,6 +262,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -771,6 +785,7 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
e.task = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type IndexLookUpMergeJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage
prepared bool
}

type outerMergeCtx struct {
Expand Down Expand Up @@ -181,7 +182,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -268,6 +268,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT

// Next implements the Executor interface
func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -750,5 +754,6 @@ func (e *IndexLookUpMergeJoin) Close() error {
// cancelFunc control the outer worker and outer worker close the task channel.
e.workerWg.Wait()
e.memTracker = nil
e.prepared = false
return e.baseExecutor.Close()
}
10 changes: 0 additions & 10 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ type Storage interface {
SetIter(iter int)
GetIter() int

// We use this channel to notify reader that Storage is ready to read.
// It exists only to solve the special implementation of IndexLookUpJoin.
// We will find a better way and remove this later.
GetBegCh() chan struct{}

GetMemTracker() *memory.Tracker
GetDiskTracker() *disk.Tracker
ActionSpill() *chunk.SpillDiskAction
Expand Down Expand Up @@ -238,11 +233,6 @@ func (s *StorageRC) GetIter() int {
return s.iter
}

// GetBegCh impls Storage GetBegCh interface.
func (s *StorageRC) GetBegCh() chan struct{} {
return s.begCh
}

// GetMemTracker impls Storage GetMemTracker interface.
func (s *StorageRC) GetMemTracker() *memory.Tracker {
return s.rc.GetMemTracker()
Expand Down