Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add some test cases about dynamic-mode and apply operator #24683

Merged
merged 12 commits into from
May 19, 2021
144 changes: 142 additions & 2 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (s *partitionTableSuite) TestOrderByandLimit(c *C) {

// range partition table
tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) (
partition p0 values less than(300),
partition p1 values less than (500),
partition p0 values less than(300),
partition p1 values less than (500),
partition p2 values less than(1100));`)

// hash partition table
Expand Down Expand Up @@ -1298,6 +1298,146 @@ func (s *partitionTableSuite) TestSplitRegion(c *C) {
tk.MustPartition(`select * from thash where a in (1, 10001, 20001)`, "p1").Sort().Check(result)
}

func (s *partitionTableSuite) TestParallelApply(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create database test_parallel_apply")
tk.MustExec("use test_parallel_apply")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("set tidb_enable_parallel_apply=true")

tk.MustExec(`create table touter (a int, b int)`)
tk.MustExec(`create table tinner (a int, b int, key(a))`)
tk.MustExec(`create table thash (a int, b int, key(a)) partition by hash(a) partitions 4`)
tk.MustExec(`create table trange (a int, b int, key(a)) partition by range(a) (
partition p0 values less than(10000),
partition p1 values less than(20000),
partition p2 values less than(30000),
partition p3 values less than(40000))`)

vouter := make([]string, 0, 100)
for i := 0; i < 100; i++ {
vouter = append(vouter, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000)))
}
tk.MustExec("insert into touter values " + strings.Join(vouter, ", "))

vals := make([]string, 0, 2000)
for i := 0; i < 100; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000)))
}
tk.MustExec("insert into tinner values " + strings.Join(vals, ", "))
tk.MustExec("insert into thash values " + strings.Join(vals, ", "))
tk.MustExec("insert into trange values " + strings.Join(vals, ", "))

// parallel apply + hash partition + IndexReader as its inner child
tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply
` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.a)->Column#9`,
` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`,
` └─IndexFullScan 10000.00 cop[tikv] table:thash, index:a(a) keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows())

// parallel apply + hash partition + TableReader as its inner child
tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply
` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.b)->Column#9`,
` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`,
` └─TableFullScan 10000.00 cop[tikv] table:thash keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows())

// parallel apply + hash partition + IndexLookUp as its inner child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tinner is a non-partition table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's expected. It's used to generate the correct result so that we can compare this result with results generated by partition tables.

tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply
` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`,
` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`,
` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`,
` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash use index(a) where thash.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows())

// parallel apply + range partition + IndexReader as its inner child
tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply
` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.a)->Column#9`,
` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`,
` └─IndexFullScan 10000.00 cop[tikv] table:trange, index:a(a) keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows())

// parallel apply + range partition + TableReader as its inner child
tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply
` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.b)->Column#9`,
` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`,
` └─TableFullScan 10000.00 cop[tikv] table:trange keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows())

// parallel apply + range partition + IndexLookUp as its inner child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows(
`Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`,
`└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`,
` ├─TableReader(Build) 10000.00 root data:TableFullScan`,
` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`,
` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`,
` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply
` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`,
` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`,
` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`,
` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`))
tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange use index(a) where trange.a>touter.b)`).Sort().Check(
tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows())

// random queries
ops := []string{"!=", ">", "<", ">=", "<="}
aggFuncs := []string{"sum", "count", "max", "min"}
tbls := []string{"tinner", "thash", "trange"}
for i := 0; i < 50; i++ {
var r [][]interface{}
op := ops[rand.Intn(len(ops))]
agg := aggFuncs[rand.Intn(len(aggFuncs))]
x := rand.Intn(10000)
for _, tbl := range tbls {
q := fmt.Sprintf(`select * from touter where touter.a > (select %v(%v.b) from %v where %v.a%vtouter.b-%v)`, agg, tbl, tbl, tbl, op, x)
if r == nil {
r = tk.MustQuery(q).Sort().Rows()
} else {
tk.MustQuery(q).Sort().Check(r)
}
}
}
}

func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
Expand Down