From ab2a29f2a8bc2fd9c1a136e7298b2d2b6bd7e06e Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 17 May 2021 15:13:18 +0800 Subject: [PATCH 1/3] add some cases about apply --- executor/partition_table_test.go | 125 ++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index e87e1044278e3..6b1eec68830b8 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -239,8 +239,8 @@ func (s *partitionTableSuite) TestOrderByandLimit(c *C) { // range partition table tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) ( - partition p0 values less than(300), - partition p1 values less than (500), + partition p0 values less than(300), + partition p1 values less than (500), partition p2 values less than(1100));`) // hash partition table @@ -949,6 +949,127 @@ func (s *partitionTableSuite) TestSplitRegion(c *C) { tk.MustPartition(`select * from thash where a in (1, 10001, 20001)`, "p1").Sort().Check(result) } +func (s *partitionTableSuite) TestParallelApply(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_parallel_apply") + tk.MustExec("use test_parallel_apply") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("set tidb_enable_parallel_apply=true") + + tk.MustExec(`create table touter (a int, b int)`) + tk.MustExec(`create table tinner (a int, b int, key(a))`) + tk.MustExec(`create table thash (a int, b int, key(a)) partition by hash(a) partitions 4`) + tk.MustExec(`create table trange (a int, b int, key(a)) partition by range(a) ( + partition p0 values less than(10000), + partition p1 values less than(20000), + partition p2 values less than(30000), + partition p3 values less than(40000))`) + + vouter := make([]string, 0, 100) + for i := 0; i < 100; i++{ + vouter = append(vouter, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) + } + tk.MustExec("insert into touter values " + strings.Join(vouter, ", ")) + + vals := make([]string, 0, 4000) + for i := 0; i < 4000; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) + } + tk.MustExec("insert into tinner values " + strings.Join(vals, ", ")) + tk.MustExec("insert into thash values " + strings.Join(vals, ", ")) + tk.MustExec("insert into trange values " + strings.Join(vals, ", ")) + + // parallel apply + hash partition + IndexReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.a)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`, + ` └─IndexFullScan 10000.00 cop[tikv] table:thash, index:a(a) keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + hash partition + TableReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.b)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`, + ` └─TableFullScan 10000.00 cop[tikv] table:thash keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + hash partition + IndexLookUp as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply + ` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`, + ` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`, + ` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash use index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + IndexReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.a)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`, + ` └─IndexFullScan 10000.00 cop[tikv] table:trange, index:a(a) keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + TableReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.b)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`, + ` └─TableFullScan 10000.00 cop[tikv] table:trange keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + IndexLookUp as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply + ` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`, + ` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`, + ` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange use index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) +} + func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test") From 8738669ef7dd4f04a8cb3008217fb891d215f3cf Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 17 May 2021 15:55:24 +0800 Subject: [PATCH 2/3] add random queries --- executor/partition_table_test.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 6b1eec68830b8..46c07113296b1 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -970,13 +970,13 @@ func (s *partitionTableSuite) TestParallelApply(c *C) { partition p3 values less than(40000))`) vouter := make([]string, 0, 100) - for i := 0; i < 100; i++{ + for i := 0; i < 100; i++ { vouter = append(vouter, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) } tk.MustExec("insert into touter values " + strings.Join(vouter, ", ")) - vals := make([]string, 0, 4000) - for i := 0; i < 4000; i++ { + vals := make([]string, 0, 2000) + for i := 0; i < 100; i++ { vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) } tk.MustExec("insert into tinner values " + strings.Join(vals, ", ")) @@ -1068,6 +1068,26 @@ func (s *partitionTableSuite) TestParallelApply(c *C) { ` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`)) tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange use index(a) where trange.a>touter.b)`).Sort().Check( tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + + // random queries + ops := []string{"!=", ">", "<", ">=", "<="} + aggFuncs := []string{"sum", "count", "max", "min"} + tbls := []string{"tinner", "thash", "trange"} + for i := 0; i < 50; i++ { + var r [][]interface{} + op := ops[rand.Intn(len(ops))] + agg := aggFuncs[rand.Intn(len(aggFuncs))] + x := rand.Intn(10000) + for _, tbl := range tbls { + q := fmt.Sprintf(`select * from touter where touter.a > (select %v(%v.b) from %v where %v.a%vtouter.b-%v)`, agg, tbl, tbl, tbl, op, x) + if r == nil { + r = tk.MustQuery(q).Sort().Rows() + } else { + tk.MustQuery(q).Sort().Check(r) + } + } + } } func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { From 997b05961700df0b89e4e348ccd20d1272218fe6 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 17 May 2021 19:57:08 +0800 Subject: [PATCH 3/3] make linter happy --- executor/partition_table_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 46c07113296b1..038dbc6d6d47e 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -1069,7 +1069,6 @@ func (s *partitionTableSuite) TestParallelApply(c *C) { tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange use index(a) where trange.a>touter.b)`).Sort().Check( tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) - // random queries ops := []string{"!=", ">", "<", ">=", "<="} aggFuncs := []string{"sum", "count", "max", "min"}