Skip to content

Commit

Permalink
planner, executor: support broadcast join for tiflash engine. (#17232)
Browse files Browse the repository at this point in the history
* support batch cop for tiflash

* support batch cop

* support join push down to tiflash

* refine

* change pb

* push join

* fix

* add hint

* refine hint

* add ranges

* fix

* fix

* fix push down

* fix index

* enable distsql for join

* add a session var to disable/enable broadcast join

* fix bug

* fix bug

* tiny fix

* enable cast decimal pushdown to tiflash

* fix

* fix bc join bug

* make broadcast plan stable

* refine code

* fix bug

* basic support for multi table broadcast join

* fix bug

* basic cbo for broadcast join

* improve

* fix bug

* remote useless code

* add tests

* pass unit tests

* refine code

* support execute summary info for broadcast join

* fix bug in explain for broadcast join

* format code

* remove un-needed code

* fix make dev

* address comments

* Hanfei/join merge (#7)

* enable exec details for batch cop

* format code

* fix test

* change tidb_opt_broadcast_join to global vars

* Ban cartesian join to be pushed down to TiFlash (#8)

* merge master (#10)

* merge master

* fix bug

* fix bug

* fix ut

* check session var conflict

* Add perfer local hint for broadcast join (#12)

* update

* remove useless code

* remove useless code

* update parser

* add test for prefer local join

* use bcj_local

* update go.mod

* refine planner

* refine comments

* fix make dev

* fix make dev

* update parser

* address comments

* fix make dev

* disable broadcast join when new collation is enabled

* Update planner/core/exhaust_physical_plans.go

Co-authored-by: Zhuomin(Charming) Liu <[email protected]>

* address comments

* fix

* address comments

* fix tests

* address comments

Co-authored-by: xufei <[email protected]>
Co-authored-by: xufei <[email protected]>
Co-authored-by: 虎 <[email protected]>
Co-authored-by: Zhuomin(Charming) Liu <[email protected]>
  • Loading branch information
5 people authored Jul 27, 2020
1 parent 76cdffd commit 29178df
Show file tree
Hide file tree
Showing 36 changed files with 845 additions and 98 deletions.
9 changes: 7 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,14 @@ func (r *selectResult) updateCopRuntimeStats(detail *execdetails.ExecDetails, re
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
planID := ""
if detail.GetExecutorId() != "" {
planID = detail.GetExecutorId()
} else {
planID = r.copPlanIDs[i].String()
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID.String(), callee, detail)
RecordOneCopTask(planID, callee, detail)
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,7 +2055,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan
streaming := true
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(sctx)
execPB, err := p.ToPB(sctx, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand All @@ -2077,7 +2077,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre
return
}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) {
execPB, err := p.ToPB(sctx, kv.TiFlash)
return []*tipb.Executor{execPB}, false, err

}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) {
dagReq = &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location())
sc := b.ctx.GetSessionVars().StmtCtx
Expand All @@ -2086,7 +2092,13 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag
dagReq.CollectExecutionSummaries = &collExec
}
dagReq.Flags = sc.PushDownFlags()
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, streaming, err = constructDistExecForTiFlash(b.ctx, plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
}

distsql.SetEncodeType(b.ctx, dagReq)
return dagReq, streaming, err
Expand Down Expand Up @@ -2316,7 +2328,7 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
case 1:
for _, p := range v.TablePlans {
switch p.(type) {
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN:
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN, *plannercore.PhysicalBroadCastJoin:
e.batchCop = true
}
}
Expand All @@ -2327,11 +2339,15 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
}

func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.TablePlans)
tablePlans := v.TablePlans
if v.StoreType == kv.TiFlash {
tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()}
}
dagReq, streaming, err := b.constructDAGReq(tablePlans, v.StoreType)
if err != nil {
return nil, err
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
Expand Down Expand Up @@ -2398,15 +2414,15 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *
return nil
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
return ret
}

func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans)
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2480,7 +2496,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) {
tableReq, tableStreaming, err := b.constructDAGReq(plans)
tableReq, tableStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, nil, err
}
Expand All @@ -2498,7 +2514,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
}

func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
indexReq, indexStreaming, err := b.constructDAGReq(plans)
indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest {
Columns: []*model.ColumnInfo{},
Table: &model.TableInfo{ID: 12345, PKIsHandle: false},
Desc: false,
}})
}}, kv.TiKV)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200714122454-1a64f969cb3c
github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965 h1:a0kZ+iaj/sbzJa5mt5310t1XJSpY+wmmIauAkrr7gU4=
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalIndexHashJoin:
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalBroadCastJoin:
buildSide = plan.InnerChildIdx
}

if buildSide != -1 {
Expand Down
Loading

0 comments on commit 29178df

Please sign in to comment.