Skip to content

Commit

Permalink
expression, planner: allow pushdown count distinct when enumerate phy…
Browse files Browse the repository at this point in the history
…sical plans (#22867)
  • Loading branch information
tisonkun authored Mar 11, 2021
1 parent c4f3989 commit 362883c
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 93 deletions.
2 changes: 1 addition & 1 deletion expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag
}
children = append(children, pbArg)
}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp)}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct}
}

// PBExprToAggFuncDesc converts pb to aggregate function.
Expand Down
19 changes: 12 additions & 7 deletions expression/aggregation/agg_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package aggregation

import (
"encoding/json"
"fmt"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -76,13 +77,13 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) {
}

jsons := []string{
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
"null",
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""}}`,
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":""},"has_distinct":%v}`,
}
for i, funcName := range funcNames {
for _, hasDistinct := range []bool{true, false} {
Expand All @@ -93,7 +94,11 @@ func (s *testEvaluatorSuite) TestAggFunc2Pb(c *C) {
pbExpr := AggFuncToPBExpr(sc, client, aggFunc)
js, err := json.Marshal(pbExpr)
c.Assert(err, IsNil)
c.Assert(string(js), Equals, jsons[i])
if funcName != ast.AggFuncGroupConcat {
c.Assert(string(js), Equals, fmt.Sprintf(jsons[i], hasDistinct))
} else {
c.Assert(string(js), Equals, "null")
}
}
}
}
128 changes: 64 additions & 64 deletions expression/expr_to_pb_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210220073817-777cefd7ea62
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand All @@ -73,7 +73,7 @@ require (
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b
golang.org/x/text v0.3.5
golang.org/x/tools v0.1.0
google.golang.org/grpc v1.27.1
Expand Down
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 h1:/ogXgm4guJzow4U
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20210220073817-777cefd7ea62 h1:196Wdwpe8anV0rHENaTKm0a2eDgptkhVgw0okg07a00=
github.com/pingcap/tipb v0.0.0-20210220073817-777cefd7ea62/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b h1:AvGm1DqSEwbGgiiu3KVuTtwLl3MqhbwwnJpx82l6/7M=
github.com/pingcap/tipb v0.0.0-20210308034246-066a76fd4e1b/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -466,6 +466,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.12+incompatible h1:6VEGkOXP/eP4o2Ilk8cSsX0PhOEfX6leqAnD+urrp9M=
Expand Down Expand Up @@ -711,8 +712,8 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 h1:SgQ6LNaYJU0JIuEHv9+s6EbhSCwYeAf5Yvj6lpYlqAE=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b h1:ggRgirZABFolTmi3sn6Ivd9SipZwLedQ5wR0aAKnFxU=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
14 changes: 8 additions & 6 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,9 +2246,11 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
// TODO: support more operators and distinct later
func (la *LogicalAggregation) checkCanPushDownToMPP() bool {
for _, agg := range la.AggFuncs {
// MPP does not support distinct now
// MPP does not support distinct except count distinct now
if agg.HasDistinct {
return false
if agg.Name != ast.AggFuncCount {
return false
}
}
// MPP does not support AggFuncApproxCountDistinct now
if agg.Name == ast.AggFuncApproxCountDistinct {
Expand Down Expand Up @@ -2332,13 +2334,13 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
if la.ctx.GetSessionVars().AllowBCJ {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
if la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() {
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP()
if canPushDownToMPP {
taskTypes = append(taskTypes, property.MppTaskType)
}
if la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown && !canPushDownToMPP {
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
Expand Down
1 change: 1 addition & 0 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
"desc format = 'brief' select sum(b) from (select t.id, t1.id as b from t join t t1 on t.id=t1.id)A group by id",
"desc format = 'brief' select * from (select id from t group by id) C join (select sum(b),id from (select t.id, t1.id as b from t join (select id, count(*) as c from t group by id) t1 on t.id=t1.id)A group by id)B on C.id=b.id",
"desc format = 'brief' select count(distinct value),id from t group by id",
"desc format = 'brief' select count(distinct value),sum(distinct value),id from t group by id",
"desc format = 'brief' select * from t join ( select count(distinct value), id from t group by id) as A on A.id = t.id",
"desc format = 'brief' select * from t join ( select count(1/value), id from t group by id) as A on A.id = t.id"
]
Expand Down
37 changes: 28 additions & 9 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1660,22 +1660,41 @@
{
"SQL": "desc format = 'brief' select count(distinct value),id from t group by id",
"Plan": [
"HashAgg 8000.00 root group by:test.t.id, funcs:count(distinct test.t.value)->Column#4, funcs:firstrow(test.t.id)->test.t.id",
"TableReader 8000.00 root data:ExchangeSender",
"└─ExchangeSender 8000.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 8000.00 batchCop[tiflash] Column#4, test.t.id",
" └─HashAgg 8000.00 batchCop[tiflash] group by:test.t.id, funcs:count(distinct test.t.value)->Column#4, funcs:firstrow(test.t.id)->test.t.id",
" └─ExchangeReceiver 8000.00 batchCop[tiflash] ",
" └─ExchangeSender 8000.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id",
" └─HashAgg 8000.00 batchCop[tiflash] group by:test.t.id, test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(distinct value),sum(distinct value),id from t group by id",
"Plan": [
"HashAgg 8000.00 root group by:test.t.id, funcs:count(distinct test.t.value)->Column#4, funcs:sum(distinct test.t.value)->Column#5, funcs:firstrow(test.t.id)->test.t.id",
"└─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select * from t join ( select count(distinct value), id from t group by id) as A on A.id = t.id",
"Plan": [
"HashJoin 9990.00 root inner join, equal:[eq(test.t.id, test.t.id)]",
"├─HashAgg(Build) 7992.00 root group by:test.t.id, funcs:count(distinct test.t.value)->Column#7, funcs:firstrow(test.t.id)->test.t.id",
"│ └─TableReader 9990.00 root data:Selection",
"│ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))",
"│ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo",
"└─TableReader(Probe) 9990.00 root data:Selection",
" └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"
"TableReader 9990.00 root data:ExchangeSender",
"└─ExchangeSender 9990.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 9990.00 batchCop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]",
" ├─Projection(Build) 7992.00 batchCop[tiflash] Column#7, test.t.id",
" │ └─HashAgg 7992.00 batchCop[tiflash] group by:test.t.id, funcs:count(distinct test.t.value)->Column#7, funcs:firstrow(test.t.id)->test.t.id",
" │ └─ExchangeReceiver 7992.00 batchCop[tiflash] ",
" │ └─ExchangeSender 7992.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id",
" │ └─HashAgg 7992.00 batchCop[tiflash] group by:test.t.id, test.t.value, ",
" │ └─Selection 9990.00 batchCop[tiflash] not(isnull(test.t.id))",
" │ └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo",
" └─ExchangeReceiver(Probe) 9990.00 batchCop[tiflash] ",
" └─ExchangeSender 9990.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id",
" └─Selection 9990.00 batchCop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
Expand Down

0 comments on commit 362883c

Please sign in to comment.