diff --git a/cmd/explaintest/r/index_merge.result b/cmd/explaintest/r/index_merge.result index 6b44c6122987e..a19368d7f4b87 100644 --- a/cmd/explaintest/r/index_merge.result +++ b/cmd/explaintest/r/index_merge.result @@ -391,14 +391,14 @@ Delete_11 N/A root N/A └─SelectLock_17 4056.68 root for update 0 └─HashJoin_33 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)] ├─HashAgg_36(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1 - │ └─IndexMerge_41 2248.30 root type: union - │ ├─IndexRangeScan_37(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - │ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - │ └─Selection_40(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10))) - │ └─TableRowIDScan_39 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo - └─TableReader_44(Probe) 9990.00 root data:Selection_43 - └─Selection_43 9990.00 cop[tikv] not(isnull(test.t1.c1)) - └─TableFullScan_42 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo + │ └─IndexMerge_45 2248.30 root type: union + │ ├─IndexRangeScan_41(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + │ ├─IndexRangeScan_42(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + │ └─Selection_44(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10))) + │ └─TableRowIDScan_43 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo + └─TableReader_48(Probe) 9990.00 root data:Selection_47 + └─Selection_47 9990.00 cop[tikv] not(isnull(test.t1.c1)) + └─TableFullScan_46 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo delete from t1 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10) order by 1; select * from t1; c1 c2 c3 @@ -409,14 +409,14 @@ Update_10 N/A root N/A └─SelectLock_14 4056.68 root for update 0 └─HashJoin_30 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)] ├─HashAgg_33(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1 - │ └─IndexMerge_38 2248.30 root type: union - │ ├─IndexRangeScan_34(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - │ ├─IndexRangeScan_35(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - │ └─Selection_37(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10))) - │ └─TableRowIDScan_36 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo - └─TableReader_41(Probe) 9990.00 root data:Selection_40 - └─Selection_40 9990.00 cop[tikv] not(isnull(test.t1.c1)) - └─TableFullScan_39 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo + │ └─IndexMerge_42 2248.30 root type: union + │ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + │ ├─IndexRangeScan_39(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + │ └─Selection_41(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10))) + │ └─TableRowIDScan_40 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo + └─TableReader_45(Probe) 9990.00 root data:Selection_44 + └─Selection_44 9990.00 cop[tikv] not(isnull(test.t1.c1)) + └─TableFullScan_43 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo update t1 set c1 = 100, c2 = 100, c3 = 100 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10); select * from t1; c1 c2 c3 @@ -471,11 +471,11 @@ insert into t1 values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5); explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2; id estRows task access object operator info TopN_10 1.00 root test.t1.c1, offset:2, count:1 -└─IndexMerge_19 1841.86 root type: union - ├─IndexRangeScan_15(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_16(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_18(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_17 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +└─IndexMerge_23 1841.86 root type: union + ├─IndexRangeScan_19(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_20(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_22(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_21 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2; c1 c2 c3 3 3 3 @@ -483,13 +483,13 @@ c1 c2 c3 explain select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1; id estRows task access object operator info Sort_6 1473.49 root Column#5 -└─HashAgg_11 1473.49 root group by:Column#10, funcs:sum(Column#9)->Column#5 - └─Projection_18 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#9, test.t1.c1 - └─IndexMerge_16 1841.86 root type: union - ├─IndexRangeScan_12(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_13(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_15(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_14 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +└─HashAgg_11 1473.49 root group by:Column#13, funcs:sum(Column#12)->Column#5 + └─Projection_22 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#12, test.t1.c1 + └─IndexMerge_20 1841.86 root type: union + ├─IndexRangeScan_16(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_19(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_18 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1; sum(c1) 1 @@ -536,8 +536,8 @@ Sort_16 1841.86 root test.t1.c1 │ └─Selection_25(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) │ └─TableRowIDScan_24 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo └─TopN_29(Probe) 1841.86 root test.t2.c1, offset:2, count:1 - └─HashAgg_36 4900166.23 root group by:Column#21, funcs:avg(Column#19)->Column#9, funcs:firstrow(Column#20)->test.t2.c1 - └─Projection_48 6125207.79 root cast(test.t2.c1, decimal(10,0) BINARY)->Column#19, test.t2.c1, test.t2.c1 + └─HashAgg_35 4900166.23 root group by:Column#24, funcs:avg(Column#22)->Column#9, funcs:firstrow(Column#23)->test.t2.c1 + └─Projection_53 6125207.79 root cast(test.t2.c1, decimal(10,0) BINARY)->Column#22, test.t2.c1, test.t2.c1 └─IndexMerge_41 6125207.79 root type: union ├─Selection_38(Build) 6121.12 cop[tikv] eq(test.t1.c1, test.t2.c1) │ └─IndexRangeScan_37 6121120.92 cop[tikv] table:t2, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo diff --git a/planner/core/casetest/physical_plan_test.go b/planner/core/casetest/physical_plan_test.go index f3c66721227e3..1023fb0a15510 100644 --- a/planner/core/casetest/physical_plan_test.go +++ b/planner/core/casetest/physical_plan_test.go @@ -2191,3 +2191,30 @@ func TestHashAggPushdownToTiFlashCompute(t *testing.T) { tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func TestIndexMergeOrderPushDown(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("set tidb_cost_model_version=1") + tk.MustExec("create table t (a int, b int, c int, index idx(a, c), index idx2(b, c))") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/casetest/testdata/integration_suite_out.json b/planner/core/casetest/testdata/integration_suite_out.json index 51f5275c778a0..bfbf756c08bfe 100644 --- a/planner/core/casetest/testdata/integration_suite_out.json +++ b/planner/core/casetest/testdata/integration_suite_out.json @@ -2099,7 +2099,7 @@ " └─TableRowIDScan_14(Probe) 0.00 186.61 cop[tikv] table:t keep order:false" ], "Warnings": [ - "Note 1105 [idx_b] remain after pruning paths for t given Prop{SortItems: [], TaskTp: copDoubleReadTask}" + "Note 1105 [idx_b] remain after pruning paths for t given Prop{SortItems: [], TaskTp: copMultiReadTask}" ] }, { @@ -2111,7 +2111,7 @@ "└─TableRowIDScan_11(Probe) 0.00 186.61 cop[tikv] table:t keep order:false" ], "Warnings": [ - "Note 1105 [idx_b] remain after pruning paths for t given Prop{SortItems: [], TaskTp: copDoubleReadTask}" + "Note 1105 [idx_b] remain after pruning paths for t given Prop{SortItems: [], TaskTp: copMultiReadTask}" ] } ] diff --git a/planner/core/casetest/testdata/plan_suite_in.json b/planner/core/casetest/testdata/plan_suite_in.json index fdabafeb9c211..ec93e0c3999da 100644 --- a/planner/core/casetest/testdata/plan_suite_in.json +++ b/planner/core/casetest/testdata/plan_suite_in.json @@ -1210,5 +1210,19 @@ "select f from t use index() where f not in (1,2,3) and f not in (3,4,5) -- intersection of two not in. Not done yet", "select f from t use index() where f not in (1,2,3) and f in (1,2,3) -- intersection of in and not in. Not done yet" ] + }, + { + "name": "TestIndexMergeOrderPushDown", + "cases": [ + "select * from t where a = 1 or b = 1 order by c limit 2", + "select * from t where a = 1 or b in (1, 2, 3) order by c limit 2", + "select * from t where a in (1, 2, 3) or b = 1 order by c limit 2", + "select * from t where a in (1, 2, 3) or b in (1, 2, 3) order by c limit 2", + "select * from t where (a = 1 and c = 2) or (b = 1) order by c limit 2", + "select * from t where (a = 1 and c = 2) or b in (1, 2, 3) order by c limit 2", + "select * from t where (a = 1 and c = 2) or (b in (1, 2, 3) and c = 3) order by c limit 2", + "select * from t where (a = 1 or b = 2) and c = 3 order by c limit 2", + "select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2" + ] } ] diff --git a/planner/core/casetest/testdata/plan_suite_out.json b/planner/core/casetest/testdata/plan_suite_out.json index 2394a0e9e669c..86ee4c51259b2 100644 --- a/planner/core/casetest/testdata/plan_suite_out.json +++ b/planner/core/casetest/testdata/plan_suite_out.json @@ -382,11 +382,11 @@ "└─ExchangeSender 4439.11 mpp[tiflash] ExchangeType: PassThrough", " └─Projection 4439.11 mpp[tiflash] test.t.a, Column#5", " └─Projection 4439.11 mpp[tiflash] Column#5, test.t.a", - " └─HashAgg 4439.11 mpp[tiflash] group by:test.t.a, test.t.c, funcs:sum(Column#13)->Column#5, funcs:firstrow(test.t.a)->test.t.a", + " └─HashAgg 4439.11 mpp[tiflash] group by:test.t.a, test.t.c, funcs:sum(Column#16)->Column#5, funcs:firstrow(test.t.a)->test.t.a", " └─ExchangeReceiver 4439.11 mpp[tiflash] ", " └─ExchangeSender 4439.11 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.c, collate: binary]", - " └─HashAgg 4439.11 mpp[tiflash] group by:Column#16, Column#17, funcs:sum(Column#15)->Column#13", - " └─Projection 5548.89 mpp[tiflash] cast(test.t.b, decimal(10,0) BINARY)->Column#15, test.t.a, test.t.c", + " └─HashAgg 4439.11 mpp[tiflash] group by:Column#19, Column#20, funcs:sum(Column#18)->Column#16", + " └─Projection 5548.89 mpp[tiflash] cast(test.t.b, decimal(10,0) BINARY)->Column#18, test.t.a, test.t.c", " └─Selection 5548.89 mpp[tiflash] or(lt(test.t.b, 2), gt(test.t.a, 2))", " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" ], @@ -7649,5 +7649,119 @@ "Best": "TableReader(Table(t)->Sel([not(in(test.t.f, 1, 2, 3)) in(test.t.f, 1, 2, 3)]))" } ] + }, + { + "Name": "TestIndexMergeOrderPushDown", + "Cases": [ + { + "SQL": "select * from t where a = 1 or b = 1 order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 19.99 root type: union", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:true, stats:pseudo", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], keep order:true, stats:pseudo", + " └─TableRowIDScan(Probe) 19.99 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where a = 1 or b in (1, 2, 3) order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 2.00 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 30.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], [2,2], [3,3], keep order:false, stats:pseudo", + " └─TopN(Probe) 2.00 cop[tikv] test.t.c, offset:0, count:2", + " └─TableRowIDScan 39.97 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where a in (1, 2, 3) or b = 1 order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 2.00 root type: union", + " ├─IndexRangeScan(Build) 30.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], [2,2], [3,3], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], keep order:false, stats:pseudo", + " └─TopN(Probe) 2.00 cop[tikv] test.t.c, offset:0, count:2", + " └─TableRowIDScan 39.97 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where a in (1, 2, 3) or b in (1, 2, 3) order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 2.00 root type: union", + " ├─IndexRangeScan(Build) 30.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], [2,2], [3,3], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 30.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], [2,2], [3,3], keep order:false, stats:pseudo", + " └─TopN(Probe) 2.00 cop[tikv] test.t.c, offset:0, count:2", + " └─TableRowIDScan 59.91 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where (a = 1 and c = 2) or (b = 1) order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 10.10 root type: union", + " ├─Limit(Build) 0.10 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:true, stats:pseudo", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], keep order:true, stats:pseudo", + " └─TableRowIDScan(Probe) 10.10 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where (a = 1 and c = 2) or b in (1, 2, 3) order by c limit 2", + "Plan": [ + "TopN 2.00 root test.t.c, offset:0, count:2", + "└─IndexMerge 2.00 root type: union", + " ├─IndexRangeScan(Build) 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 30.00 cop[tikv] table:t, index:idx2(b, c) range:[1,1], [2,2], [3,3], keep order:false, stats:pseudo", + " └─TopN(Probe) 2.00 cop[tikv] test.t.c, offset:0, count:2", + " └─TableRowIDScan 30.10 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where (a = 1 and c = 2) or (b in (1, 2, 3) and c = 3) order by c limit 2", + "Plan": [ + "TopN 0.40 root test.t.c, offset:0, count:2", + "└─IndexMerge 0.40 root type: union", + " ├─IndexRangeScan(Build) 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 0.30 cop[tikv] table:t, index:idx2(b, c) range:[1 3,1 3], [2 3,2 3], [3 3,3 3], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 0.40 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where (a = 1 or b = 2) and c = 3 order by c limit 2", + "Plan": [ + "TopN 0.02 root test.t.c, offset:0, count:2", + "└─IndexMerge 0.02 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx2(b, c) range:[2,2], keep order:false, stats:pseudo", + " └─Selection(Probe) 0.02 cop[tikv] eq(test.t.c, 3)", + " └─TableRowIDScan 19.99 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2", + "Plan": [ + "TopN 0.06 root test.t.c, offset:0, count:2", + "└─IndexMerge 0.06 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:false, stats:pseudo", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:idx2(b, c) range:[2,2], keep order:false, stats:pseudo", + " └─Selection(Probe) 0.06 cop[tikv] in(test.t.c, 1, 2, 3)", + " └─TableRowIDScan 19.99 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warning": null + } + ] } ] diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 2e78d7f059b35..f9dc616bb19f5 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2363,7 +2363,7 @@ func pushLimitOrTopNForcibly(p LogicalPlan) bool { } func (lt *LogicalTopN) getPhysTopN(_ *property.PhysicalProperty) []PhysicalPlan { - allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} if !pushLimitOrTopNForcibly(lt) { allTaskTypes = append(allTaskTypes, property.RootTaskType) } @@ -2389,7 +2389,7 @@ func (lt *LogicalTopN) getPhysLimits(_ *property.PhysicalProperty) []PhysicalPla return nil } - allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} if !pushLimitOrTopNForcibly(lt) { allTaskTypes = append(allTaskTypes, property.RootTaskType) } @@ -2714,7 +2714,7 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope if !prop.IsPrefix(childProp) { return enforcedAggs } - taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} if la.HasDistinct() { // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. @@ -2961,7 +2961,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy return nil } hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes())) - taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() if la.HasDistinct() { @@ -3101,7 +3101,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] return nil, true, nil } - allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} if !pushLimitOrTopNForcibly(p) { allTaskTypes = append(allTaskTypes, property.RootTaskType) } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 60bc111c4fdc5..47a2027e589ed 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -328,6 +328,19 @@ func getTaskPlanCost(t task, op *physicalOptimizeOp) (float64, bool, error) { default: return 0, false, errors.New("unknown task type") } + if t.plan() == nil { + // It's a very special case for index merge case. + cost := 0.0 + copTsk := t.(*copTask) + for _, partialScan := range copTsk.idxMergePartPlans { + partialCost, err := getPlanCost(partialScan, taskType, NewDefaultPlanCostOption().WithOptimizeTracer(op)) + if err != nil { + return 0, false, err + } + cost += partialCost + } + return cost, false, nil + } cost, err := getPlanCost(t.plan(), taskType, NewDefaultPlanCostOption().WithOptimizeTracer(op)) return cost, false, err } @@ -1103,13 +1116,16 @@ func (ds *DataSource) canConvertToPointGetForPlanCache(path *util.AccessPath) bo } func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) { - if prop.TaskTp != property.RootTaskType || !prop.IsSortItemEmpty() { + if prop.IsFlashProp() || prop.TaskTp == property.CopSingleReadTaskType || !prop.IsSortItemEmpty() { + return invalidTask, nil + } + if prop.TaskTp == property.CopMultiReadTaskType && candidate.path.IndexMergeIsIntersection { return invalidTask, nil } path := candidate.path scans := make([]PhysicalPlan, 0, len(path.PartialIndexPaths)) cop := &copTask{ - indexPlanFinished: true, + indexPlanFinished: false, tblColHists: ds.TblColHists, } cop.partitionInfo = PartitionInfo{ @@ -1133,7 +1149,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c } ts, remainingFilters, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount) if err != nil { - return nil, err + return invalidTask, err } cop.tablePlan = ts cop.idxMergePartPlans = scans @@ -1142,8 +1158,17 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c if remainingFilters != nil { cop.rootTaskConds = remainingFilters } - task = cop.convertToRootTask(ds.ctx) - ds.addSelection4PlanCache(task.(*rootTask), ds.tableStats.ScaleByExpectCnt(totalRowCount), prop) + _, pureTableScan := ts.(*PhysicalTableScan) + if prop.TaskTp != property.RootTaskType && (len(remainingFilters) > 0 || !pureTableScan) { + return invalidTask, nil + } + if prop.TaskTp == property.RootTaskType { + cop.indexPlanFinished = true + task = cop.convertToRootTask(ds.ctx) + ds.addSelection4PlanCache(task.(*rootTask), ds.tableStats.ScaleByExpectCnt(totalRowCount), prop) + } else { + task = cop + } return task, nil } @@ -1432,7 +1457,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, if prop.TaskTp == property.CopSingleReadTaskType { return invalidTask, nil } - } else if prop.TaskTp == property.CopDoubleReadTaskType { + } else if prop.TaskTp == property.CopMultiReadTaskType { // If it's parent requires double read task, return max cost. return invalidTask, nil } @@ -1980,7 +2005,7 @@ func (ds *DataSource) isPointGetPath(path *util.AccessPath) bool { // convertToTableScan converts the DataSource to table scan. func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) { // It will be handled in convertToIndexScan. - if prop.TaskTp == property.CopDoubleReadTaskType { + if prop.TaskTp == property.CopMultiReadTaskType { return invalidTask, nil } if !prop.IsSortItemEmpty() && !candidate.isMatchProp { @@ -2099,7 +2124,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid func (ds *DataSource) convertToSampleTable(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) { - if prop.TaskTp == property.CopDoubleReadTaskType { + if prop.TaskTp == property.CopMultiReadTaskType { return invalidTask, nil } if !prop.IsSortItemEmpty() && !candidate.isMatchProp { @@ -2126,7 +2151,7 @@ func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candida if !prop.IsSortItemEmpty() && !candidate.isMatchProp { return invalidTask } - if prop.TaskTp == property.CopDoubleReadTaskType && candidate.path.IsSingleScan || + if prop.TaskTp == property.CopMultiReadTaskType && candidate.path.IsSingleScan || prop.TaskTp == property.CopSingleReadTaskType && !candidate.path.IsSingleScan { return invalidTask } @@ -2204,7 +2229,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, if !prop.IsSortItemEmpty() && !candidate.isMatchProp { return invalidTask } - if prop.TaskTp == property.CopDoubleReadTaskType && candidate.path.IsSingleScan || + if prop.TaskTp == property.CopMultiReadTaskType && candidate.path.IsSingleScan || prop.TaskTp == property.CopSingleReadTaskType && !candidate.path.IsSingleScan { return invalidTask } diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index d987d6d256929..40e242639ad0e 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3189,14 +3189,18 @@ func TestIssue29221(t *testing.T) { tk.MustQuery("explain format = 'brief' select * from t where a = 1 or b = 1;").Check(testkit.Rows( "Limit 3.00 root offset:0, count:3", "└─IndexMerge 3.00 root type: union", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", " └─TableRowIDScan(Probe) 3.00 cop[tikv] table:t keep order:false, stats:pseudo")) tk.MustQuery("explain format = 'brief' select /*+ use_index_merge(t) */ * from t where a = 1 or b = 1;").Check(testkit.Rows( "Limit 3.00 root offset:0, count:3", "└─IndexMerge 3.00 root type: union", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", " └─TableRowIDScan(Probe) 3.00 cop[tikv] table:t keep order:false, stats:pseudo")) tk.MustExec("set @@session.sql_select_limit=18446744073709551615;") tk.MustQuery("explain format = 'brief' select * from t where a = 1 or b = 1;").Check(testkit.Rows( @@ -3207,8 +3211,10 @@ func TestIssue29221(t *testing.T) { tk.MustQuery("explain format = 'brief' select * from t where a = 1 or b = 1 limit 3;").Check(testkit.Rows( "Limit 3.00 root offset:0, count:3", "└─IndexMerge 3.00 root type: union", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", " └─TableRowIDScan(Probe) 3.00 cop[tikv] table:t keep order:false, stats:pseudo")) tk.MustQuery("explain format = 'brief' select /*+ use_index_merge(t) */ * from t where a = 1 or b = 1;").Check(testkit.Rows( "IndexMerge 19.99 root type: union", @@ -3218,8 +3224,10 @@ func TestIssue29221(t *testing.T) { tk.MustQuery("explain format = 'brief' select /*+ use_index_merge(t) */ * from t where a = 1 or b = 1 limit 3;").Check(testkit.Rows( "Limit 3.00 root offset:0, count:3", "└─IndexMerge 3.00 root type: union", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", - " ├─IndexRangeScan(Build) 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_a(a) range:[1,1], keep order:false, stats:pseudo", + " ├─Limit(Build) 1.50 cop[tikv] offset:0, count:3", + " │ └─IndexRangeScan 1.50 cop[tikv] table:t, index:idx_b(b) range:[1,1], keep order:false, stats:pseudo", " └─TableRowIDScan(Probe) 3.00 cop[tikv] table:t keep order:false, stats:pseudo")) } diff --git a/planner/core/plan_cost_ver1.go b/planner/core/plan_cost_ver1.go index f7459c70fb01a..ecbdc0e654fb0 100644 --- a/planner/core/plan_cost_ver1.go +++ b/planner/core/plan_cost_ver1.go @@ -77,7 +77,7 @@ func (p *PhysicalSelection) getPlanCostVer1(taskType property.TaskType, option * switch taskType { case property.RootTaskType, property.MppTaskType: cpuFactor = p.ctx.GetSessionVars().GetCPUFactor() - case property.CopSingleReadTaskType, property.CopDoubleReadTaskType: + case property.CopSingleReadTaskType, property.CopMultiReadTaskType: cpuFactor = p.ctx.GetSessionVars().GetCopCPUFactor() default: return 0, errors.Errorf("unknown task type %v", taskType) @@ -181,7 +181,7 @@ func (p *PhysicalIndexLookUpReader) getPlanCostVer1(taskType property.TaskType, p.planCost = 0 // child's cost for _, child := range []PhysicalPlan{p.indexPlan, p.tablePlan} { - childCost, err := child.getPlanCostVer1(property.CopDoubleReadTaskType, option) + childCost, err := child.getPlanCostVer1(property.CopMultiReadTaskType, option) if err != nil { return 0, err } @@ -194,7 +194,7 @@ func (p *PhysicalIndexLookUpReader) getPlanCostVer1(taskType property.TaskType, for tmp = p.tablePlan; len(tmp.Children()) > 0; tmp = tmp.Children()[0] { } ts := tmp.(*PhysicalTableScan) - tblCost, err := ts.getPlanCostVer1(property.CopDoubleReadTaskType, option) + tblCost, err := ts.getPlanCostVer1(property.CopMultiReadTaskType, option) if err != nil { return 0, err } @@ -1027,7 +1027,7 @@ func (p *PhysicalHashAgg) getPlanCostVer1(taskType property.TaskType, option *Pl switch taskType { case property.RootTaskType: p.planCost += p.GetCost(statsCnt, true, false, costFlag) - case property.CopSingleReadTaskType, property.CopDoubleReadTaskType: + case property.CopSingleReadTaskType, property.CopMultiReadTaskType: p.planCost += p.GetCost(statsCnt, false, false, costFlag) case property.MppTaskType: p.planCost += p.GetCost(statsCnt, false, true, costFlag) diff --git a/planner/core/plan_cost_ver2.go b/planner/core/plan_cost_ver2.go index dfecbd0761fff..1f1569e0c14df 100644 --- a/planner/core/plan_cost_ver2.go +++ b/planner/core/plan_cost_ver2.go @@ -242,7 +242,7 @@ func (p *PhysicalIndexLookUpReader) getPlanCostVer2(taskType property.TaskType, // index-side indexNetCost := netCostVer2(option, indexRows, indexRowSize, netFactor) - indexChildCost, err := p.indexPlan.getPlanCostVer2(property.CopDoubleReadTaskType, option) + indexChildCost, err := p.indexPlan.getPlanCostVer2(property.CopMultiReadTaskType, option) if err != nil { return zeroCostVer2, err } @@ -250,7 +250,7 @@ func (p *PhysicalIndexLookUpReader) getPlanCostVer2(taskType property.TaskType, // table-side tableNetCost := netCostVer2(option, tableRows, tableRowSize, netFactor) - tableChildCost, err := p.tablePlan.getPlanCostVer2(property.CopDoubleReadTaskType, option) + tableChildCost, err := p.tablePlan.getPlanCostVer2(property.CopMultiReadTaskType, option) if err != nil { return zeroCostVer2, err } diff --git a/planner/core/task.go b/planner/core/task.go index e134ea13ce346..f111ea62e7c8a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -153,7 +153,9 @@ func (t *copTask) finishIndexPlan() { return } t.indexPlanFinished = true - if t.tablePlan != nil { + // index merge case is specially handled for now. + // We need a elegant way to solve the stats of index merge in this case. + if t.tablePlan != nil && t.indexPlan != nil { ts := t.tablePlan.(*PhysicalTableScan) originStats := ts.stats ts.stats = t.indexPlan.statsInfo() @@ -819,22 +821,43 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { t := tasks[0].copy() sunk := false if cop, ok := t.(*copTask); ok { - // For double read which requires order being kept, the limit cannot be pushed down to the table side, - // because handles would be reordered before being sent to table scan. - if (!cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil) && len(cop.rootTaskConds) == 0 { - // When limit is pushed down, we should remove its offset. - newCount := p.Offset + p.Count - childProfile := cop.plan().statsInfo() - // Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum", - // but "regionNum" is unknown since the copTask can be a double read, so we ignore it now. - stats := deriveLimitStats(childProfile, float64(newCount)) - pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) - cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) - // Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right. - pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + if len(cop.idxMergePartPlans) == 0 { + // For double read which requires order being kept, the limit cannot be pushed down to the table side, + // because handles would be reordered before being sent to table scan. + if (!cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil) && len(cop.rootTaskConds) == 0 { + // When limit is pushed down, we should remove its offset. + newCount := p.Offset + p.Count + childProfile := cop.plan().statsInfo() + // Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum", + // but "regionNum" is unknown since the copTask can be a double read, so we ignore it now. + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) + cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) + // Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right. + pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + } + t = cop.convertToRootTask(p.ctx) + sunk = p.sinkIntoIndexLookUp(t) + } else if !cop.idxMergeIsIntersection { + // We only support push part of the order prop down to index merge case. + if !cop.keepOrder && !cop.indexPlanFinished && len(cop.rootTaskConds) == 0 { + newCount := p.Offset + p.Count + limitChildren := make([]PhysicalPlan, 0, len(cop.idxMergePartPlans)) + for _, partialScan := range cop.idxMergePartPlans { + childProfile := partialScan.statsInfo() + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) + pushedDownLimit.SetChildren(partialScan) + pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + limitChildren = append(limitChildren, pushedDownLimit) + } + cop.idxMergePartPlans = limitChildren + } + t = cop.convertToRootTask(p.ctx) + } else { + // Whatever the remained case is, we directly convert to it to root task. + t = cop.convertToRootTask(p.ctx) } - t = cop.convertToRootTask(p.ctx) - sunk = p.sinkIntoIndexLookUp(t) } else if mpp, ok := t.(*mppTask); ok { newCount := p.Offset + p.Count childProfile := mpp.plan().statsInfo() @@ -933,6 +956,12 @@ func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN { // // there's no prefix index column. func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*expression.Column) bool { + // If we call canPushToIndexPlan and there's no index plan, we should go into the index merge case. + // Index merge case is specially handled for now. So we directly return false here. + // So we directly return false. + if indexPlan == nil { + return false + } schema := indexPlan.Schema() for _, col := range byItemCols { pos := schema.ColumnIndex(col) @@ -979,7 +1008,13 @@ func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool { if len(copTask.rootTaskConds) != 0 { return false } - if p.containVirtualColumn(copTask.plan().Schema().Columns) { + if !copTask.indexPlanFinished && len(copTask.idxMergePartPlans) > 0 { + for _, partialPlan := range copTask.idxMergePartPlans { + if p.containVirtualColumn(partialPlan.Schema().Columns) { + return false + } + } + } else if p.containVirtualColumn(copTask.plan().Schema().Columns) { return false } return true @@ -1003,8 +1038,8 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { cols = append(cols, expression.ExtractColumns(item.Expr)...) } needPushDown := len(cols) > 0 - if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) { - newTask, changed := p.pushTopNDownToDynamicPartition(copTask) + if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 { + newTask, changed := p.pushPartialTopNDownToCop(copTask) if changed { return newTask } @@ -1015,6 +1050,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { pushedDownTopN = p.getPushedDownTopN(copTask.indexPlan) copTask.indexPlan = pushedDownTopN } else { + // It works for both normal index scan and index merge scan. copTask.finishIndexPlan() pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan) copTask.tablePlan = pushedDownTopN @@ -1027,11 +1063,11 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { return attachPlan2Task(p, rootTask) } -// pushTopNDownToDynamicPartition is a temp solution for partition table. It actually does the same thing as DataSource's isMatchProp. +// pushPartialTopNDownToCop is a temp solution for partition table and index merge. It actually does the same thing as DataSource's isMatchProp. // We need to support a more enhanced read strategy in the execution phase. So that we can achieve Limit(TiDB)->Reader(TiDB)->Limit(TiKV/TiFlash)->Scan(TiKV/TiFlash). // Before that is done, we use this logic to provide a way to keep the order property when reading from TiKV, so that we can use the orderliness of index to speed up the query. // Here we can change the execution plan to TopN(TiDB)->Reader(TiDB)->Limit(TiKV)->Scan(TiKV).(TiFlash is not supported). -func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bool) { +func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { if copTsk.getStoreType() != kv.TiKV { return nil, false } @@ -1047,40 +1083,21 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo if !allSameOrder { return nil, false } - checkIndexMatchProp := func(idxCols []*expression.Column, idxColLens []int, constColsByCond []bool, colsProp *property.PhysicalProperty) bool { - // If the number of the by-items is bigger than the index columns. We cannot push down since it must not keep order. - if len(idxCols) < len(colsProp.SortItems) { - return false - } - idxPos := 0 - for _, byItem := range colsProp.SortItems { - found := false - for ; idxPos < len(idxCols); idxPos++ { - if idxColLens[idxPos] == types.UnspecifiedLength && idxCols[idxPos].Equal(p.SCtx(), byItem.Col) { - found = true - idxPos++ - break - } - if len(constColsByCond) == 0 || idxPos > len(constColsByCond) || !constColsByCond[idxPos] { - found = false - break - } - } - if !found { - return false - } - } - return true + if len(copTsk.idxMergePartPlans) > 0 && copTsk.idxMergeIsIntersection { + return nil, false } var ( - selOnIdxScan *PhysicalSelection - selOnTblScan *PhysicalSelection - selSelectivity float64 - - idxScan *PhysicalIndexScan - tblScan *PhysicalTableScan - tblInfo *model.TableInfo - err error + selOnIdxScan *PhysicalSelection + selOnTblScan *PhysicalSelection + selSelectivity float64 + selSelectivityOnPartialScan []float64 + + idxScan *PhysicalIndexScan + tblScan *PhysicalTableScan + partialScans []PhysicalPlan + clonedPartialPlan []PhysicalPlan + tblInfo *model.TableInfo + err error ) if copTsk.indexPlan != nil { copTsk.indexPlan, err = copTsk.indexPlan.Clone() @@ -1108,6 +1125,29 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo tblScan = finalTblScanPlan.(*PhysicalTableScan) tblInfo = tblScan.Table } + if len(copTsk.idxMergePartPlans) > 0 { + // calculate selectivities for each partial plan in advance and clone partial plans since we may modify their stats later. + partialScans = make([]PhysicalPlan, 0, len(copTsk.idxMergePartPlans)) + selSelectivityOnPartialScan = make([]float64, len(copTsk.idxMergePartPlans)) + for i, scan := range copTsk.idxMergePartPlans { + selSelectivityOnPartialScan[i] = 1 + clonedScan, err := scan.Clone() + if err != nil { + return nil, false + } + clonedPartialPlan = append(clonedPartialPlan, clonedScan) + finalScan := clonedScan + var partialSel *PhysicalSelection + for len(finalScan.Children()) > 0 { + partialSel, _ = finalScan.(*PhysicalSelection) + finalScan = finalScan.Children()[0] + } + if partialSel != nil && finalScan.statsInfo().RowCount > 0 { + selSelectivityOnPartialScan[i] = partialSel.statsInfo().RowCount / finalScan.statsInfo().RowCount + } + partialScans = append(partialScans, finalScan) + } + } // Note that we only need to care about one Selection at most. if selOnIdxScan != nil && idxScan.statsInfo().RowCount > 0 { @@ -1118,36 +1158,54 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo } pi := tblInfo.GetPartitionInfo() - if pi == nil { + if pi == nil && len(copTsk.idxMergePartPlans) == 0 { return nil, false } if !copTsk.indexPlanFinished { // If indexPlan side isn't finished, there's no selection on the table side. - - propMatched := checkIndexMatchProp(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) - if !propMatched { - return nil, false - } - idxScan.Desc = isDesc - childProfile := copTsk.plan().statsInfo() - newCount := p.Offset + p.Count - stats := deriveLimitStats(childProfile, float64(newCount)) - pushedLimit := PhysicalLimit{ - Count: newCount, - }.Init(p.SCtx(), stats, p.SelectBlockOffset()) - pushedLimit.SetSchema(copTsk.indexPlan.Schema()) - copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) - - // A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan. - child := pushedLimit.Children()[0] - // The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count. - child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount))) - // The Limit->Selection->IndexScan case: - // adjust the row count of IndexScan according to the selectivity of the Selection. - if selSelectivity > 0 && selSelectivity < 1 { - scaledRowCount := child.Stats().RowCount / selSelectivity - idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount)) + if len(copTsk.idxMergePartPlans) > 0 { + // Deal with index merge case. + propMatched := p.checkSubScans(colsProp, isDesc, partialScans...) + if !propMatched { + // If there's one used index cannot match the prop. + return nil, false + } + newCopSubPlans := p.addPartialLimitForSubScans(clonedPartialPlan, partialScans, selSelectivityOnPartialScan) + copTsk.idxMergePartPlans = newCopSubPlans + clonedTblScan, err := copTsk.tablePlan.Clone() + if err != nil { + return nil, false + } + clonedTblScan.statsInfo().ScaleByExpectCnt(float64(p.Count+p.Offset) * float64(len(copTsk.idxMergePartPlans))) + copTsk.tablePlan = clonedTblScan + copTsk.indexPlanFinished = true + } else { + // The normal index scan cases.(single read and double read) + propMatched := p.checkOrderPropForSubIndexScan(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) + if !propMatched { + return nil, false + } + idxScan.Desc = isDesc + childProfile := copTsk.plan().statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copTsk.indexPlan.Schema()) + copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) + + // A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan. + child := pushedLimit.Children()[0] + // The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count. + child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount))) + // The Limit->Selection->IndexScan case: + // adjust the row count of IndexScan according to the selectivity of the Selection. + if selSelectivity > 0 && selSelectivity < 1 { + scaledRowCount := child.Stats().RowCount / selSelectivity + idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount)) + } } } else if copTsk.indexPlan == nil { if tblScan.HandleCols == nil { @@ -1161,7 +1219,7 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo } } else { idxCols, idxColLens := expression.IndexInfo2PrefixCols(tblScan.Columns, tblScan.Schema().Columns, tables.FindPrimaryIndex(tblScan.Table)) - matched := checkIndexMatchProp(idxCols, idxColLens, nil, colsProp) + matched := p.checkOrderPropForSubIndexScan(idxCols, idxColLens, nil, colsProp) if !matched { return nil, false } @@ -1196,10 +1254,101 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo return attachPlan2Task(p, rootTask), true } +// checkOrderPropForSubIndexScan checks whether these index columns can meet the specified order property. +func (p *PhysicalTopN) checkOrderPropForSubIndexScan(idxCols []*expression.Column, idxColLens []int, constColsByCond []bool, colsProp *property.PhysicalProperty) bool { + // If the number of the by-items is bigger than the index columns. We cannot push down since it must not keep order. + if len(idxCols) < len(colsProp.SortItems) { + return false + } + idxPos := 0 + for _, byItem := range colsProp.SortItems { + found := false + for ; idxPos < len(idxCols); idxPos++ { + if idxColLens[idxPos] == types.UnspecifiedLength && idxCols[idxPos].Equal(p.SCtx(), byItem.Col) { + found = true + idxPos++ + break + } + if len(constColsByCond) == 0 || idxPos > len(constColsByCond) || !constColsByCond[idxPos] { + found = false + break + } + } + if !found { + return false + } + } + return true +} + +// checkSubScans checks whether all these Scans can meet the specified order property. +func (p *PhysicalTopN) checkSubScans(colsProp *property.PhysicalProperty, isDesc bool, scans ...PhysicalPlan) bool { + for _, scan := range scans { + switch x := scan.(type) { + case *PhysicalIndexScan: + propMatched := p.checkOrderPropForSubIndexScan(x.IdxCols, x.IdxColLens, x.constColsByCond, colsProp) + if !propMatched { + return false + } + x.KeepOrder = true + x.Desc = isDesc + case *PhysicalTableScan: + if x.HandleCols == nil { + return false + } + + if x.HandleCols.IsInt() { + pk := x.HandleCols.GetCol(0) + if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx(), pk) { + return false + } + } else { + idxCols, idxColLens := expression.IndexInfo2PrefixCols(x.Columns, x.Schema().Columns, tables.FindPrimaryIndex(x.Table)) + matched := p.checkOrderPropForSubIndexScan(idxCols, idxColLens, nil, colsProp) + if !matched { + return false + } + } + x.KeepOrder = true + x.Desc = isDesc + default: + return false + } + } + // Return true when all sub index plan matched. + return true +} + +func (p *PhysicalTopN) addPartialLimitForSubScans(copSubPlans []PhysicalPlan, finalPartialScans []PhysicalPlan, selSelectivities []float64) []PhysicalPlan { + limitAddedPlan := make([]PhysicalPlan, 0, len(copSubPlans)) + for i, copSubPlan := range copSubPlans { + childProfile := copSubPlan.statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copSubPlan.Schema()) + pushedLimit.SetChildren(copSubPlan) + // A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan. + child := pushedLimit.Children()[0] + // The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count. + child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount))) + // The Limit->Selection->IndexScan case: + // adjust the row count of IndexScan according to the selectivity of the Selection. + if selSelectivities[i] > 0 && selSelectivities[i] < 1 { + scaledRowCount := child.Stats().RowCount / selSelectivities[i] + finalPartialScans[i].SetStats(finalPartialScans[i].Stats().ScaleByExpectCnt(scaledRowCount)) + } + limitAddedPlan = append(limitAddedPlan, pushedLimit) + } + return limitAddedPlan +} + func (p *PhysicalProjection) attach2Task(tasks ...task) task { t := tasks[0].copy() if cop, ok := t.(*copTask); ok { - if len(cop.rootTaskConds) == 0 && expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.Exprs, p.ctx.GetClient(), cop.getStoreType()) { + if (len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0) && expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, p.Exprs, p.ctx.GetClient(), cop.getStoreType()) { copTask := attachPlan2Task(p, cop) return copTask } @@ -1824,10 +1973,13 @@ func computePartialCursorOffset(name string) int { func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task { t := tasks[0].copy() if cop, ok := t.(*copTask); ok { - // We should not push agg down across double read, since the data of second read is ordered by handle instead of index. - // We use (cop.indexPlan != nil && cop.tablePlan != nil && cop.keepOrder) to decided whether the following plan is double - // read with order reserved. - if (cop.indexPlan != nil && cop.tablePlan != nil && cop.keepOrder) || len(cop.rootTaskConds) > 0 { + // We should not push agg down across + // 1. double read, since the data of second read is ordered by handle instead of index. The `extraHandleCol` is added + // if the double read needs to keep order. So we just use it to decided + // whether the following plan is double read with order reserved. + // 2. the case that there's filters should be calculated on TiDB side. + // 3. the case of index merge + if (cop.indexPlan != nil && cop.tablePlan != nil && cop.keepOrder) || len(cop.rootTaskConds) > 0 || len(cop.idxMergePartPlans) > 0 { t = cop.convertToRootTask(p.ctx) attachPlan2Task(p, t) } else { @@ -2074,7 +2226,7 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task { t := tasks[0].copy() final := p if cop, ok := t.(*copTask); ok { - if len(cop.rootTaskConds) == 0 { + if len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0 { copTaskType := cop.getStoreType() partialAgg, finalAgg := p.newPartialAggregate(copTaskType, false) if finalAgg != nil { diff --git a/planner/property/physical_property.go b/planner/property/physical_property.go index 60994d05b57ef..e92b74adbbe4a 100644 --- a/planner/property/physical_property.go +++ b/planner/property/physical_property.go @@ -30,7 +30,7 @@ import ( // wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get // these tasks one by one. -var wholeTaskTypes = []TaskType{CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType} +var wholeTaskTypes = []TaskType{CopSingleReadTaskType, CopMultiReadTaskType, RootTaskType} // SortItem wraps the column and its order. type SortItem struct { diff --git a/planner/property/task_type.go b/planner/property/task_type.go index a4c16d4a51d2e..8a424b26284b7 100644 --- a/planner/property/task_type.go +++ b/planner/property/task_type.go @@ -25,9 +25,9 @@ const ( // executed in the coprocessor layer. CopSingleReadTaskType - // CopDoubleReadTaskType stands for the a IndexLookup tasks executed in the + // CopMultiReadTaskType stands for the a IndexLookup tasks executed in the // coprocessor layer. - CopDoubleReadTaskType + CopMultiReadTaskType // MppTaskType stands for task that would run on Mpp nodes, currently meaning the tiflash node. MppTaskType @@ -40,8 +40,8 @@ func (t TaskType) String() string { return "rootTask" case CopSingleReadTaskType: return "copSingleReadTask" - case CopDoubleReadTaskType: - return "copDoubleReadTask" + case CopMultiReadTaskType: + return "copMultiReadTask" case MppTaskType: return "mppTask" }