Skip to content

Commit

Permalink
cherry pick pingcap#33085 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
guo-shaoge authored and ti-srebot committed Mar 17, 2022
1 parent ae76eac commit 412e979
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 19 deletions.
3 changes: 0 additions & 3 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ func (e *CTEExec) Close() (err error) {
func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
// This means iterInTbl's can be read.
defer close(e.iterInTbl.GetBegCh())
chks := make([]*chunk.Chunk, 0, 10)
for {
if e.limitDone(e.iterInTbl) {
Expand Down Expand Up @@ -368,7 +366,6 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) {
if err = e.iterInTbl.Reopen(); err != nil {
return err
}
defer close(e.iterInTbl.GetBegCh())
if e.isDistinct {
// Already deduplicated by resTbl, adding directly is ok.
for _, chk := range chks {
Expand Down
3 changes: 0 additions & 3 deletions executor/cte_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func (e *CTETableReaderExec) Open(ctx context.Context) error {
func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()

// Wait until iterInTbl can be read. This is controlled by corresponding CTEExec.
<-e.iterInTbl.GetBegCh()

// We should read `iterInTbl` from the beginning when the next iteration starts.
// Can not directly judge whether to start the next iteration based on e.chkIdx,
// because some operators(Selection) may use forloop to read all data in `iterInTbl`.
Expand Down
98 changes: 98 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
<<<<<<< HEAD
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
=======
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
)

var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}})
Expand Down Expand Up @@ -448,3 +455,94 @@ func (test *CTETestSuite) TestCTEWithLimit(c *check.C) {
rows = tk.MustQuery("with recursive cte1(c1) as (select c1 from t1 union all select c1 + 1 from cte1 limit 4 offset 4) select * from cte1;")
rows.Check(testkit.Rows("3", "4", "3", "4"))
}
<<<<<<< HEAD
=======

func TestSpillToDisk(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
})

store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill"))
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"))
}()

// Use duplicated rows to test UNION DISTINCT.
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
insertStr := "insert into t1 values(0)"
rowNum := 1000
vals := make([]int, rowNum)
vals[0] = 0
for i := 1; i < rowNum; i++ {
v := rand.Intn(100)
vals[i] = v
insertStr += fmt.Sprintf(", (%d)", v)
}
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int);")
tk.MustExec(insertStr)
tk.MustExec("set tidb_mem_quota_query = 40000;")
tk.MustExec("set cte_max_recursion_depth = 500000;")
sql := fmt.Sprintf("with recursive cte1 as ( "+
"select c1 from t1 "+
"union "+
"select c1 + 1 c1 from cte1 where c1 < %d) "+
"select c1 from cte1 order by c1;", rowNum)
rows := tk.MustQuery(sql)

memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker
diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker
require.Greater(t, memTracker.MaxConsumed(), int64(0))
require.Greater(t, diskTracker.MaxConsumed(), int64(0))

sort.Ints(vals)
resRows := make([]string, 0, rowNum)
for i := vals[0]; i <= rowNum; i++ {
resRows = append(resRows, fmt.Sprintf("%d", i))
}
rows.Check(testkit.Rows(resRows...))
}

func TestCTEExecError(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists src;")
tk.MustExec("create table src(first int, second int);")

insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000))
for i := 0; i < 1000; i++ {
insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000))
}
insertStr += ";"
tk.MustExec(insertStr)

// Increase projection concurrency and decrease chunk size
// to increase the probability of reproducing the problem.
tk.MustExec("set tidb_max_chunk_size = 32")
tk.MustExec("set tidb_projection_concurrency = 20")
for i := 0; i < 10; i++ {
err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " +
"(select 1, first, second, first+second from src " +
" union all " +
"select iter+1, second, result, second+result from cte where iter < 80 )" +
"select * from cte")
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
}
}
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
9 changes: 7 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type IndexNestedLoopHashJoin struct {
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
stats *indexLookUpJoinRuntimeStats
prepared bool
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -153,7 +154,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -230,6 +230,10 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {

// Next implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
Expand Down Expand Up @@ -329,6 +333,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
15 changes: 15 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ type IndexLookUpJoin struct {

memTracker *memory.Tracker // track memory usage.

<<<<<<< HEAD
stats *indexLookUpJoinRuntimeStats
ctxCancelReason atomic.Value
finished *atomic.Value
=======
stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
prepared bool
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
}

type outerCtx struct {
Expand Down Expand Up @@ -168,7 +174,11 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
<<<<<<< HEAD
e.startWorkers(ctx)
=======
e.cancelFunc = nil
>>>>>>> f12ad1e6c... executor: fix CTE may be blocked when query report error (#33085)
return nil
}

Expand Down Expand Up @@ -252,6 +262,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -771,6 +785,7 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
e.task = nil
e.finished.Store(false)
e.prepared = false
return e.baseExecutor.Close()
}

Expand Down
7 changes: 6 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type IndexLookUpMergeJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage
prepared bool
}

type outerMergeCtx struct {
Expand Down Expand Up @@ -181,7 +182,6 @@ func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.startWorkers(ctx)
return nil
}

Expand Down Expand Up @@ -268,6 +268,10 @@ func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinT

// Next implements the Executor interface
func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.prepared {
e.startWorkers(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
Expand Down Expand Up @@ -750,5 +754,6 @@ func (e *IndexLookUpMergeJoin) Close() error {
// cancelFunc control the outer worker and outer worker close the task channel.
e.workerWg.Wait()
e.memTracker = nil
e.prepared = false
return e.baseExecutor.Close()
}
10 changes: 0 additions & 10 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ type Storage interface {
SetIter(iter int)
GetIter() int

// We use this channel to notify reader that Storage is ready to read.
// It exists only to solve the special implementation of IndexLookUpJoin.
// We will find a better way and remove this later.
GetBegCh() chan struct{}

GetMemTracker() *memory.Tracker
GetDiskTracker() *disk.Tracker
ActionSpill() *chunk.SpillDiskAction
Expand Down Expand Up @@ -238,11 +233,6 @@ func (s *StorageRC) GetIter() int {
return s.iter
}

// GetBegCh impls Storage GetBegCh interface.
func (s *StorageRC) GetBegCh() chan struct{} {
return s.begCh
}

// GetMemTracker impls Storage GetMemTracker interface.
func (s *StorageRC) GetMemTracker() *memory.Tracker {
return s.rc.GetMemTracker()
Expand Down

0 comments on commit 412e979

Please sign in to comment.