Skip to content

Commit

Permalink
*: refactoring planner code #480
Browse files Browse the repository at this point in the history
[summary]
`planner` code has poor readability currently, we need refactoring the code.
[test case]
src/planner/builder/aggregate_plan_test.go
src/planner/builder/builder_test.go
src/planner/builder/expr_test.go
src/planner/builder/from_test.go
src/planner/builder/limit_plan_test.go
src/planner/builder/orderby_plan_test.go
src/planner/builder/sqlparser_test.go
src/planner/others_plan_test.go
src/planner/planner_test.go
src/planner/select_plan_test.go
src/planner/union_plan_test.go
[patch codecov]
src/planner/builder 97.7%
src/planner 95.8%
  • Loading branch information
zhyass authored and BohuTANG committed Nov 2, 2019
1 parent ec0d2cd commit 62f85da
Show file tree
Hide file tree
Showing 49 changed files with 2,431 additions and 2,124 deletions.
16 changes: 4 additions & 12 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ test:
@$(MAKE) testoptimizer
@$(MAKE) testplanner
@$(MAKE) testexecutor
@$(MAKE) testengine
@$(MAKE) testoperator
@$(MAKE) testbackend
@$(MAKE) testproxy
@$(MAKE) testaudit
Expand All @@ -51,13 +49,9 @@ testrouter:
testoptimizer:
go test -v optimizer
testplanner:
go test -v planner
go test -v planner/...
testexecutor:
go test -v executor
testengine:
go test -v executor/engine
testoperator:
go test -v executor/engine/operator
go test -v executor/...
testbackend:
go test -v -race backend
testproxy:
Expand Down Expand Up @@ -91,10 +85,8 @@ allpkgs = xbase\
config\
router\
optimizer\
planner\
executor\
executor/engine\
executor/engine/operator\
planner/...\
executor/...\
backend\
proxy\
audit\
Expand Down
14 changes: 7 additions & 7 deletions src/executor/engine/join_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"backend"
"executor/engine/operator"
"planner"
"planner/builder"
"xcontext"

"github.com/pkg/errors"
Expand All @@ -29,13 +29,13 @@ var (
// JoinEngine represents join executor.
type JoinEngine struct {
log *xlog.Log
node *planner.JoinNode
node *builder.JoinNode
left, right PlanEngine
txn backend.Transaction
}

// NewJoinEngine creates the new join executor.
func NewJoinEngine(log *xlog.Log, node *planner.JoinNode, txn backend.Transaction) *JoinEngine {
func NewJoinEngine(log *xlog.Log, node *builder.JoinNode, txn backend.Transaction) *JoinEngine {
return &JoinEngine{
log: log,
node: node,
Expand All @@ -58,7 +58,7 @@ func (j *JoinEngine) Execute(ctx *xcontext.ResultContext) error {
}

maxrow := j.txn.MaxJoinRows()
if j.node.Strategy == planner.NestLoop {
if j.node.Strategy == builder.NestLoop {
joinVars := make(map[string]*querypb.BindVariable)
if err := j.execBindVars(ctx, joinVars, true); err != nil {
return err
Expand Down Expand Up @@ -86,9 +86,9 @@ func (j *JoinEngine) Execute(ctx *xcontext.ResultContext) error {
err = concatLeftAndNil(lctx.Results.Rows, j.node, ctx.Results, maxrow)
} else {
switch j.node.Strategy {
case planner.SortMerge:
case builder.SortMerge:
err = sortMergeJoin(lctx.Results, rctx.Results, ctx.Results, j.node, maxrow)
case planner.Cartesian:
case builder.Cartesian:
err = cartesianProduct(lctx.Results, rctx.Results, ctx.Results, j.node, maxrow)
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func combineVars(bv1, bv2 map[string]*querypb.BindVariable) map[string]*querypb.
}

// cartesianProduct used to produce cartesian product.
func cartesianProduct(lres, rres, res *sqltypes.Result, node *planner.JoinNode, maxrow int) error {
func cartesianProduct(lres, rres, res *sqltypes.Result, node *builder.JoinNode, maxrow int) error {
res.Rows = make([][]sqltypes.Value, 0, len(lres.Rows)*len(rres.Rows))
for _, lrow := range lres.Rows {
for _, rrow := range rres.Rows {
Expand Down
6 changes: 3 additions & 3 deletions src/executor/engine/merge_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package engine
import (
"backend"
"executor/engine/operator"
"planner"
"planner/builder"
"xcontext"

"github.com/xelabs/go-mysqlstack/sqlparser"
Expand All @@ -26,12 +26,12 @@ var (
// MergeEngine represents merge executor.
type MergeEngine struct {
log *xlog.Log
node *planner.MergeNode
node *builder.MergeNode
txn backend.Transaction
}

// NewMergeEngine creates the new merge executor.
func NewMergeEngine(log *xlog.Log, node *planner.MergeNode, txn backend.Transaction) *MergeEngine {
func NewMergeEngine(log *xlog.Log, node *builder.MergeNode, txn backend.Transaction) *MergeEngine {
return &MergeEngine{
log: log,
node: node,
Expand Down
16 changes: 8 additions & 8 deletions src/executor/engine/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sort"
"sync"

"planner"
"planner/builder"

"github.com/pkg/errors"
"github.com/xelabs/go-mysqlstack/sqlparser"
Expand All @@ -25,9 +25,9 @@ const (
)

// sortMergeJoin used to join `lres` and `rres` to `res`.
func sortMergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode, maxrow int) error {
func sortMergeJoin(lres, rres, res *sqltypes.Result, node *builder.JoinNode, maxrow int) error {
var wg sync.WaitGroup
sort := func(keys []planner.JoinKey, res *sqltypes.Result) {
sort := func(keys []builder.JoinKey, res *sqltypes.Result) {
defer wg.Done()
sort.Slice(res.Rows, func(i, j int) bool {
for _, key := range keys {
Expand All @@ -50,7 +50,7 @@ func sortMergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode, max
}

// mergeJoin used to join the sorted results.
func mergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode, maxrow int) error {
func mergeJoin(lres, rres, res *sqltypes.Result, node *builder.JoinNode, maxrow int) error {
var err error
lrows, lidx := fetchSameKeyRows(lres.Rows, node.LeftKeys, 0)
rrows, ridx := fetchSameKeyRows(rres.Rows, node.RightKeys, 0)
Expand Down Expand Up @@ -97,7 +97,7 @@ func mergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode, maxrow
}

// fetchSameKeyRows used to fetch the same joinkey values' rows.
func fetchSameKeyRows(rows [][]sqltypes.Value, joins []planner.JoinKey, index int) ([][]sqltypes.Value, int) {
func fetchSameKeyRows(rows [][]sqltypes.Value, joins []builder.JoinKey, index int) ([][]sqltypes.Value, int) {
var chunk [][]sqltypes.Value
if index >= len(rows) {
return nil, index
Expand All @@ -122,7 +122,7 @@ func fetchSameKeyRows(rows [][]sqltypes.Value, joins []planner.JoinKey, index in
return chunk, index
}

func keysEqual(row1, row2 []sqltypes.Value, joins []planner.JoinKey) bool {
func keysEqual(row1, row2 []sqltypes.Value, joins []builder.JoinKey) bool {
for _, join := range joins {
cmp := sqltypes.NullsafeCompare(row1[join.Index], row2[join.Index])
if cmp != 0 {
Expand All @@ -133,7 +133,7 @@ func keysEqual(row1, row2 []sqltypes.Value, joins []planner.JoinKey) bool {
}

// concatLeftAndRight used to concat thle left and right results, handle otherJoinOn|rightNull|OtherFilter.
func concatLeftAndRight(lrows, rrows [][]sqltypes.Value, node *planner.JoinNode, res *sqltypes.Result, maxrow int) error {
func concatLeftAndRight(lrows, rrows [][]sqltypes.Value, node *builder.JoinNode, res *sqltypes.Result, maxrow int) error {
var err error
var mu sync.Mutex
p := newCalcPool(joinWorkers)
Expand Down Expand Up @@ -247,7 +247,7 @@ func concatLeftAndRight(lrows, rrows [][]sqltypes.Value, node *planner.JoinNode,
return err
}

func concatLeftAndNil(lrows [][]sqltypes.Value, node *planner.JoinNode, res *sqltypes.Result, maxrow int) error {
func concatLeftAndNil(lrows [][]sqltypes.Value, node *builder.JoinNode, res *sqltypes.Result, maxrow int) error {
if node.IsLeftJoin && !node.HasRightFilter {
for _, row := range lrows {
res.Rows = append(res.Rows, joinRows(row, nil, node.Cols))
Expand Down
10 changes: 5 additions & 5 deletions src/executor/engine/operator/aggregate_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package operator
import (
"sort"

"planner"
"planner/builder"
"xcontext"

"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
Expand All @@ -26,11 +26,11 @@ var (
// Including: COUNT/MAX/MIN/SUM/AVG/GROUPBY.
type AggregateOperator struct {
log *xlog.Log
plan planner.Plan
plan builder.ChildPlan
}

// NewAggregateOperator creates new AggregateOperator.
func NewAggregateOperator(log *xlog.Log, plan planner.Plan) *AggregateOperator {
func NewAggregateOperator(log *xlog.Log, plan builder.ChildPlan) *AggregateOperator {
return &AggregateOperator{
log: log,
plan: plan,
Expand All @@ -52,7 +52,7 @@ func (operator *AggregateOperator) Execute(ctx *xcontext.ResultContext) error {
// select b from tb group by b. √
func (operator *AggregateOperator) aggregate(result *sqltypes.Result) {
var deIdxs []int
plan := operator.plan.(*planner.AggregatePlan)
plan := operator.plan.(*builder.AggregatePlan)
if plan.Empty() {
return
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (operator *AggregateOperator) aggregate(result *sqltypes.Result) {
result.RemoveColumns(deIdxs...)
}

func keysEqual(row1, row2 []sqltypes.Value, groups []planner.Aggregator) bool {
func keysEqual(row1, row2 []sqltypes.Value, groups []builder.Aggregator) bool {
for _, v := range groups {
cmp := sqltypes.NullsafeCompare(row1[v.Index], row2[v.Index])
if cmp != 0 {
Expand Down
8 changes: 4 additions & 4 deletions src/executor/engine/operator/limit_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package operator

import (
"planner"
"planner/builder"
"xcontext"

"github.com/xelabs/go-mysqlstack/xlog"
Expand All @@ -22,11 +22,11 @@ var (
// LimitOperator represents limit operator.
type LimitOperator struct {
log *xlog.Log
plan planner.Plan
plan builder.ChildPlan
}

// NewLimitOperator creates the new limit operator.
func NewLimitOperator(log *xlog.Log, plan planner.Plan) *LimitOperator {
func NewLimitOperator(log *xlog.Log, plan builder.ChildPlan) *LimitOperator {
return &LimitOperator{
log: log,
plan: plan,
Expand All @@ -36,7 +36,7 @@ func NewLimitOperator(log *xlog.Log, plan planner.Plan) *LimitOperator {
// Execute used to execute the operator.
func (operator *LimitOperator) Execute(ctx *xcontext.ResultContext) error {
rs := ctx.Results
plan := operator.plan.(*planner.LimitPlan)
plan := operator.plan.(*builder.LimitPlan)
rs.Limit(plan.Offset, plan.Limit)
return nil
}
12 changes: 6 additions & 6 deletions src/executor/engine/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package operator

import (
"planner"
"planner/builder"
"xcontext"

"github.com/xelabs/go-mysqlstack/xlog"
Expand All @@ -21,22 +21,22 @@ type Operator interface {
}

// ExecSubPlan used to execute all the children plan.
func ExecSubPlan(log *xlog.Log, node planner.PlanNode, ctx *xcontext.ResultContext) error {
func ExecSubPlan(log *xlog.Log, node builder.PlanNode, ctx *xcontext.ResultContext) error {
subPlanTree := node.Children()
if subPlanTree != nil {
for _, subPlan := range subPlanTree.Plans() {
for _, subPlan := range subPlanTree {
switch subPlan.Type() {
case planner.PlanTypeAggregate:
case builder.ChildTypeAggregate:
aggrOperator := NewAggregateOperator(log, subPlan)
if err := aggrOperator.Execute(ctx); err != nil {
return err
}
case planner.PlanTypeOrderby:
case builder.ChildTypeOrderby:
orderByOperator := NewOrderByOperator(log, subPlan)
if err := orderByOperator.Execute(ctx); err != nil {
return err
}
case planner.PlanTypeLimit:
case builder.ChildTypeLimit:
limitOperator := NewLimitOperator(log, subPlan)
if err := limitOperator.Execute(ctx); err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions src/executor/engine/operator/orderby_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package operator
import (
"sort"

"planner"
"planner/builder"
"xcontext"

"github.com/pkg/errors"
Expand All @@ -26,11 +26,11 @@ var (
// OrderByOperator represents order by operator.
type OrderByOperator struct {
log *xlog.Log
plan planner.Plan
plan builder.ChildPlan
}

// NewOrderByOperator creates new orderby operator.
func NewOrderByOperator(log *xlog.Log, plan planner.Plan) *OrderByOperator {
func NewOrderByOperator(log *xlog.Log, plan builder.ChildPlan) *OrderByOperator {
return &OrderByOperator{
log: log,
plan: plan,
Expand All @@ -41,7 +41,7 @@ func NewOrderByOperator(log *xlog.Log, plan planner.Plan) *OrderByOperator {
func (operator *OrderByOperator) Execute(ctx *xcontext.ResultContext) error {
var err error
rs := ctx.Results
plan := operator.plan.(*planner.OrderByPlan)
plan := operator.plan.(*builder.OrderByPlan)

sort.Slice(rs.Rows, func(i, j int) bool {
// If there are any errors below, the function sets
Expand Down Expand Up @@ -70,7 +70,7 @@ func (operator *OrderByOperator) Execute(ctx *xcontext.ResultContext) error {
if cmp == 0 {
continue
}
if orderby.Direction == planner.DESC {
if orderby.Direction == builder.DESC {
cmp = -cmp
}
return cmp < 0
Expand Down
10 changes: 5 additions & 5 deletions src/executor/engine/plan_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package engine

import (
"backend"
"planner"
"planner/builder"
"xcontext"

querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
Expand All @@ -25,17 +25,17 @@ type PlanEngine interface {
}

// BuildEngine used to build the executor tree.
func BuildEngine(log *xlog.Log, plan planner.PlanNode, txn backend.Transaction) PlanEngine {
func BuildEngine(log *xlog.Log, plan builder.PlanNode, txn backend.Transaction) PlanEngine {
var engine PlanEngine
switch node := plan.(type) {
case *planner.MergeNode:
case *builder.MergeNode:
engine = NewMergeEngine(log, node, txn)
case *planner.JoinNode:
case *builder.JoinNode:
joinEngine := NewJoinEngine(log, node, txn)
joinEngine.left = BuildEngine(log, node.Left, txn)
joinEngine.right = BuildEngine(log, node.Right, txn)
engine = joinEngine
case *planner.UnionNode:
case *builder.UnionNode:
unionEngine := NewUnionEngine(log, node, txn)
unionEngine.left = BuildEngine(log, node.Left, txn)
unionEngine.right = BuildEngine(log, node.Right, txn)
Expand Down
6 changes: 3 additions & 3 deletions src/executor/engine/union_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"backend"
"executor/engine/operator"
"planner"
"planner/builder"
"xcontext"

"github.com/xelabs/go-mysqlstack/sqlparser/depends/common"
Expand All @@ -30,13 +30,13 @@ var (
// UnionEngine represents merge executor.
type UnionEngine struct {
log *xlog.Log
node *planner.UnionNode
node *builder.UnionNode
left, right PlanEngine
txn backend.Transaction
}

// NewUnionEngine creates the new union executor.
func NewUnionEngine(log *xlog.Log, node *planner.UnionNode, txn backend.Transaction) *UnionEngine {
func NewUnionEngine(log *xlog.Log, node *builder.UnionNode, txn backend.Transaction) *UnionEngine {
return &UnionEngine{
log: log,
node: node,
Expand Down
Loading

0 comments on commit 62f85da

Please sign in to comment.