Skip to content

Commit

Permalink
planner: add cascades wrapper for integrating cascades into current l…
Browse files Browse the repository at this point in the history
…ogical optimization phase. (#58751)

ref #51664
  • Loading branch information
AilinKid authored Jan 13, 2025
1 parent 38226ec commit 6d2a05e
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 391 deletions.
24 changes: 24 additions & 0 deletions pkg/planner/cascades/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cascades",
srcs = ["cascades.go"],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades",
visibility = ["//visibility:public"],
deps = [
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/base/cascadesctx",
"//pkg/planner/cascades/memo",
"//pkg/planner/cascades/task",
"//pkg/planner/core/base",
"//pkg/util/intest",
],
)

go_test(
name = "cascades_test",
timeout = "short",
srcs = ["cascades_test.go"],
flaky = True,
deps = ["//pkg/testkit"],
)
4 changes: 2 additions & 2 deletions pkg/planner/cascades/base/cascadesctx/cascades_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
)

// CascadesContext define the cascades context as interface, since it will be defined
// Context define the cascades context as interface, since it will be defined
// in cascades pkg, which ref task pkg with no doubt.
// while in the task pkg, the concrete task need receive cascades context as its
// constructing args, which will lead an import cycle.
// so that's why we separate it out of base pkg.
type CascadesContext interface {
type Context interface {
Destroy()
GetScheduler() base.Scheduler
PushTask(task base.Task)
Expand Down
105 changes: 105 additions & 0 deletions pkg/planner/cascades/cascades.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades

import (
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/task"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/util/intest"
)

// Optimizer is a basic cascades search framework portal, driven by Context.
type Optimizer struct {
logic corebase.LogicalPlan
ctx cascadesctx.Context
}

// NewCascades return a new cascades obj for logical alternative searching.
func NewCascades(lp corebase.LogicalPlan) (*Optimizer, error) {
cas := &Optimizer{
logic: lp,
ctx: NewContext(lp.SCtx()),
}
ge, err := cas.ctx.GetMemo().Init(lp)
intest.Assert(err == nil)
intest.Assert(ge != nil)
if err != nil {
return nil, err
}
cas.ctx.GetScheduler().PushTask(task.NewOptGroupTask(cas.ctx, ge.GetGroup()))
return cas, err
}

// Execute run the yams search flow inside, returns error if it happened.
func (c *Optimizer) Execute() error {
return c.ctx.GetScheduler().ExecuteTasks()
}

// Destroy clean and reset basic elements inside.
func (c *Optimizer) Destroy() {
c.ctx.Destroy()
}

// GetMemo returns the memo structure inside cascades.
func (c *Optimizer) GetMemo() *memo.Memo {
return c.ctx.GetMemo()
}

// Context includes all the context stuff when go through memo optimizing.
type Context struct {
// pctx variable awareness.
pctx corebase.PlanContext
// memo management.
mm *memo.Memo
// task pool management.
scheduler base.Scheduler
}

// NewContext returns a new memo context responsible for manage all the stuff in cascades opt.
func NewContext(pctx corebase.PlanContext) *Context {
return &Context{
pctx: pctx,
// memo init with capacity.
mm: memo.NewMemo(pctx.GetSessionVars().StmtCtx.OperatorNum),
// task pool management.
scheduler: task.NewSimpleTaskScheduler(),
}
}

// Destroy the memo context, which will clean the resource allocated during this phase.
func (c *Context) Destroy() {
// when a memo optimizing phase is done for a session,
// we should put the stack back and clean the memo.
c.mm.Destroy()
c.scheduler.Destroy()
}

// GetScheduler return the stack inside this memo context.
func (c *Context) GetScheduler() base.Scheduler {
return c.scheduler
}

// PushTask puts a task into the stack structure inside.
func (c *Context) PushTask(task base.Task) {
c.scheduler.PushTask(task)
}

// GetMemo returns the basic memo structure.
func (c *Context) GetMemo() *memo.Memo {
return c.mm
}
38 changes: 38 additions & 0 deletions pkg/planner/cascades/cascades_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades_test

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit"
)

func TestCascadesDrive(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null, b int not null, key(a,b))")
tk.MustExec("insert into t1 values(1,1),(1,2),(2,1),(2,2),(1,1)")

// simple select for quick debug of memo, the normal test case is in tests/planner/cascades/integration.test.
tk.MustQuery("select 1").Check(testkit.Rows("1"))
tk.MustQuery("explain select 1").Check(testkit.Rows(""+
"Projection_3 1.00 root 1->Column#1",
"└─TableDual_4 1.00 root rows:1"))
}
30 changes: 30 additions & 0 deletions pkg/planner/cascades/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func NewMemo(caps ...uint64) *Memo {
}
}

// Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves.
func (mm *Memo) Destroy() {
// when a memo itself is useless, we can clean itself actively.
mm.groupIDGen.id = 0
mm.rootGroup = nil
mm.groups.Init()
clear(mm.groupID2Group)
mm.hash2GlobalGroupExpr.Clear()
mm.hasher.Reset()
}

// GetHasher gets a hasher from the memo that ready to use.
func (mm *Memo) GetHasher() base2.Hasher {
mm.hasher.Reset()
Expand Down Expand Up @@ -329,6 +340,25 @@ type IteratorLP struct {
traceID int
}

// NewIterator new a logical plan iterator from current memo based on its root group.
func (mm *Memo) NewIterator() *IteratorLP {
return &IteratorLP{
root: mm.rootGroup,
stackInfo: make([]*list.Element, 0, mm.groups.Len()),
traceID: -1,
}
}

// Each iterator all logical plan from current memo group.
func (it *IteratorLP) Each(f func(base.LogicalPlan) bool) {
cur := it.Next()
for ; cur != nil; cur = it.Next() {
if !f(cur) {
break
}
}
}

// Next return valid logical plan implied in memo without duplication.
func (it *IteratorLP) Next() (logic base.LogicalPlan) {
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// BaseTask is base task wrapper structure for encapsulating basic things.
type BaseTask struct {
ctx cascadesctx.CascadesContext
ctx cascadesctx.Context
}

// Push pushes a new task into inside stack.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_apply_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type ApplyRuleTask struct {
}

// NewApplyRuleTask return a new apply rule task.
func NewApplyRuleTask(ctx cascadesctx.CascadesContext, gE *memo.GroupExpression, r rule.Rule) *ApplyRuleTask {
func NewApplyRuleTask(ctx cascadesctx.Context, gE *memo.GroupExpression, r rule.Rule) *ApplyRuleTask {
return &ApplyRuleTask{
BaseTask: BaseTask{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_opt_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type OptGroupTask struct {
}

// NewOptGroupTask returns a new optimizing group task.
func NewOptGroupTask(ctx cascadesctx.CascadesContext, g *memo.Group) base.Task {
func NewOptGroupTask(ctx cascadesctx.Context, g *memo.Group) base.Task {
return &OptGroupTask{BaseTask: BaseTask{
ctx: ctx,
}, group: g}
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_opt_group_expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type OptGroupExpressionTask struct {
}

// NewOptGroupExpressionTask return a targeting optimizing group expression task.
func NewOptGroupExpressionTask(ctx cascadesctx.CascadesContext, ge *memo.GroupExpression) *OptGroupExpressionTask {
func NewOptGroupExpressionTask(ctx cascadesctx.Context, ge *memo.GroupExpression) *OptGroupExpressionTask {
return &OptGroupExpressionTask{
BaseTask: BaseTask{ctx: ctx},
groupExpression: ge,
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/planner/cardinality",
"//pkg/planner/cascades",
"//pkg/planner/cascades/base",
"//pkg/planner/core/base",
"//pkg/planner/core/cost",
Expand Down
38 changes: 31 additions & 7 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/cascades"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
Expand Down Expand Up @@ -274,18 +275,41 @@ func CascadesOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, l
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
}
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
// todo: add cascadesOptimize(logic)

physical, cost, err := physicalOptimize(logic, &planCounter)
var cas *cascades.Optimizer
if cas, err = cascades.NewCascades(logic); err == nil {
defer cas.Destroy()
err = cas.Execute()
}
if err != nil {
return nil, nil, 0, err
}
var (
physical base.PhysicalPlan
cost = math.MaxFloat64
)
// At current phase, cascades just iterate every logic plan out for feeding physicalOptimize.
// TODO: In the near future, physicalOptimize will be refactored as receiving *Group as param directly.
cas.GetMemo().NewIterator().Each(func(oneLogic base.LogicalPlan) bool {
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
tmpPhysical, tmpCost, tmpErr := physicalOptimize(oneLogic, &planCounter)
if tmpErr != nil {
err = tmpErr
return false
}
if tmpCost < cost {
physical = tmpPhysical
}
return true
})
if err != nil {
return nil, nil, 0, err
}
finalPlan := postOptimize(ctx, sctx, physical)

finalPlan := postOptimize(ctx, sctx, physical)
if sessVars.StmtCtx.EnableOptimizerCETrace {
refineCETrace(sctx)
}
Expand Down
Loading

0 comments on commit 6d2a05e

Please sign in to comment.