From 9fad132d1d892ade178d0614687660c29cc5af54 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 2 Jun 2021 23:26:26 +0800 Subject: [PATCH] case: make CTE case be stable (#25035) --- executor/cte.go | 4 +-- executor/cte_test.go | 85 +++++++++++++++++++++----------------------- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 95921670cb5cf..055163dc17e4f 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -111,8 +111,6 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { if err = e.iterOutTbl.OpenAndRef(); err != nil { return err } - - setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) } if e.isDistinct { @@ -137,11 +135,13 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { defer e.resTbl.Unlock() resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) + iterOutAction := setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { defer resAction.WaitForTest() defer iterInAction.WaitForTest() + defer iterOutAction.WaitForTest() } }) diff --git a/executor/cte_test.go b/executor/cte_test.go index e5789170627f7..d6e212484ad29 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -27,14 +27,16 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" ) -var _ = check.Suite(&CTETestSuite{}) +var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}}) +var _ = check.SerialSuites(&CTESerialTestSuite{&baseCTETestSuite{}}) -type CTETestSuite struct { +type baseCTETestSuite struct { store kv.Storage dom *domain.Domain sessionCtx sessionctx.Context @@ -42,7 +44,15 @@ type CTETestSuite struct { ctx context.Context } -func (test *CTETestSuite) SetUpSuite(c *check.C) { +type CTETestSuite struct { + *baseCTETestSuite +} + +type CTESerialTestSuite struct { + *baseCTETestSuite +} + +func (test *baseCTETestSuite) SetUpSuite(c *check.C) { var err error test.store, err = mockstore.NewMockStore() c.Assert(err, check.IsNil) @@ -59,7 +69,7 @@ func (test *CTETestSuite) SetUpSuite(c *check.C) { test.ctx = context.Background() } -func (test *CTETestSuite) TearDownSuite(c *check.C) { +func (test *baseCTETestSuite) TearDownSuite(c *check.C) { test.dom.Close() test.store.Close() } @@ -107,70 +117,55 @@ func (test *CTETestSuite) TestBasicCTE(c *check.C) { rows.Check(testkit.Rows("1", "2")) } -func (test *CTETestSuite) TestSpillToDisk(c *check.C) { +func (test *CTESerialTestSuite) TestSpillToDisk(c *check.C) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMUseTmpStorage = true + }) + tk := testkit.NewTestKit(c, test.store) tk.MustExec("use test;") c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)"), check.IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill"), check.IsNil) + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + }() + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"), check.IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"), check.IsNil) }() - - insertStr := "insert into t1 values(0, 0)" - for i := 1; i < 5000; i++ { - insertStr += fmt.Sprintf(", (%d, %d)", i, i) - } - - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int, c2 int);") - tk.MustExec(insertStr) - tk.MustExec("set tidb_mem_quota_query = 80000;") - rows := tk.MustQuery("with recursive cte1 as ( " + - "select c1 from t1 " + - "union " + - "select c1 + 1 c1 from cte1 where c1 < 5000) " + - "select c1 from cte1;") - - memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker - diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker - c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0)) - c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0)) - - rowNum := 5000 - var resRows []string - for i := 0; i <= rowNum; i++ { - resRows = append(resRows, fmt.Sprintf("%d", i)) - } - rows.Check(testkit.Rows(resRows...)) // Use duplicated rows to test UNION DISTINCT. tk.MustExec("set tidb_mem_quota_query = 1073741824;") - insertStr = "insert into t1 values(0, 0)" + insertStr := "insert into t1 values(0)" + rowNum := 1000 vals := make([]int, rowNum) vals[0] = 0 for i := 1; i < rowNum; i++ { v := rand.Intn(100) vals[i] = v - insertStr += fmt.Sprintf(", (%d, %d)", v, v) + insertStr += fmt.Sprintf(", (%d)", v) } tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int, c2 int);") + tk.MustExec("create table t1(c1 int);") tk.MustExec(insertStr) - tk.MustExec("set tidb_mem_quota_query = 80000;") + tk.MustExec("set tidb_mem_quota_query = 40000;") tk.MustExec("set cte_max_recursion_depth = 500000;") - rows = tk.MustQuery("with recursive cte1 as ( " + - "select c1 from t1 " + - "union " + - "select c1 + 1 c1 from cte1 where c1 < 5000) " + - "select c1 from cte1 order by c1;") - - memTracker = tk.Se.GetSessionVars().StmtCtx.MemTracker - diskTracker = tk.Se.GetSessionVars().StmtCtx.DiskTracker + sql := fmt.Sprintf("with recursive cte1 as ( "+ + "select c1 from t1 "+ + "union "+ + "select c1 + 1 c1 from cte1 where c1 < %d) "+ + "select c1 from cte1 order by c1;", rowNum) + rows := tk.MustQuery(sql) + + memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker + diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0)) c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0)) sort.Ints(vals) - resRows = make([]string, 0, rowNum) + resRows := make([]string, 0, rowNum) for i := vals[0]; i <= rowNum; i++ { resRows = append(resRows, fmt.Sprintf("%d", i)) }