Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: generate PointGet plans for PlanCache when all conditions are EQ #29859

Merged
merged 5 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -514,9 +513,14 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess := tk.Se.ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[5][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[5][3]), "table:t2"), IsTrue)
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // can use idx_a
`Projection_9 1.00 root test.t1.a, test.t1.b, test.t2.a, test.t2.b`,
`└─IndexJoin_17 1.00 root inner join, inner:TableReader_13, outer key:test.t2.a, inner key:test.t1.a, equal cond:eq(test.t2.a, test.t1.a)`,
` ├─Selection_44(Build) 0.80 root not(isnull(test.t2.a))`,
` │ └─Point_Get_43 1.00 root table:t2, index:idx_a(a) `,
` └─TableReader_13(Probe) 0.00 root data:Selection_12`,
` └─Selection_12 0.00 cop[tikv] eq(test.t1.a, 1)`,
` └─TableRangeScan_11 1.00 cop[tikv] table:t1 range: decided by [test.t2.a], keep order:false, stats:pseudo`))

tk.MustExec("set @a=2")
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
Expand All @@ -525,9 +529,14 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[5][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[5][3]), "table:t2"), IsTrue)
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // can use idx_a
`Projection_9 1.00 root test.t1.a, test.t1.b, test.t2.a, test.t2.b`,
`└─IndexJoin_17 1.00 root inner join, inner:TableReader_13, outer key:test.t2.a, inner key:test.t1.a, equal cond:eq(test.t2.a, test.t1.a)`,
` ├─Selection_44(Build) 0.80 root not(isnull(test.t2.a))`,
` │ └─Point_Get_43 1.00 root table:t2, index:idx_a(a) `,
` └─TableReader_13(Probe) 0.00 root data:Selection_12`,
` └─Selection_12 0.00 cop[tikv] eq(test.t1.a, 2)`,
` └─TableRangeScan_11 1.00 cop[tikv] table:t1 range: decided by [test.t2.a], keep order:false, stats:pseudo`))
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
"2 4 2 2",
))
Expand Down
82 changes: 78 additions & 4 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,83 @@ func (s *testSerialSuite) TestIssue28782(c *C) {
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
}

func (s *testSerialSuite) TestIssue29850(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
defer func() {
dom.Close()
store.Close()
}()
orgEnable := plannercore.PreparedPlanCacheEnabled()
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
}()
plannercore.SetPreparedPlanCache(true)

tk.MustExec(`set tidb_enable_clustered_index=on`)
tk.MustExec("set @@tidb_enable_collect_execution_info=0")
tk.MustExec(`use test`)
tk.MustExec(`CREATE TABLE customer (
c_id int(11) NOT NULL,
c_d_id int(11) NOT NULL,
c_first varchar(16) DEFAULT NULL,
c_w_id int(11) NOT NULL,
c_last varchar(16) DEFAULT NULL,
c_credit char(2) DEFAULT NULL,
c_discount decimal(4,4) DEFAULT NULL,
PRIMARY KEY (c_w_id,c_d_id,c_id),
KEY idx_customer (c_w_id,c_d_id,c_last,c_first))`)
tk.MustExec(`CREATE TABLE warehouse (
w_id int(11) NOT NULL,
w_tax decimal(4,4) DEFAULT NULL,
PRIMARY KEY (w_id))`)
tk.MustExec(`prepare stmt from 'SELECT c_discount, c_last, c_credit, w_tax
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
FROM customer, warehouse
WHERE w_id = ? AND c_w_id = w_id AND c_d_id = ? AND c_id = ?'`)
tk.MustExec(`set @w_id=1262`)
tk.MustExec(`set @c_d_id=7`)
tk.MustExec(`set @c_id=1549`)
tk.MustQuery(`execute stmt using @w_id, @c_d_id, @c_id`).Check(testkit.Rows())
tkProcess := tk.Se.ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // can use PointGet
`Projection_7 0.00 root test.customer.c_discount, test.customer.c_last, test.customer.c_credit, test.warehouse.w_tax`,
`└─MergeJoin_8 0.00 root inner join, left key:test.customer.c_w_id, right key:test.warehouse.w_id`,
` ├─Point_Get_34(Build) 1.00 root table:warehouse handle:1262`,
` └─Point_Get_33(Probe) 1.00 root table:customer, clustered index:PRIMARY(c_w_id, c_d_id, c_id) `))
tk.MustQuery(`execute stmt using @w_id, @c_d_id, @c_id`).Check(testkit.Rows())
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) // can use the cached plan

tk.MustExec(`create table t (a int primary key)`)
tk.MustExec(`insert into t values (1), (2)`)
tk.MustExec(`prepare stmt from 'select * from t where a>=? and a<=?'`)
tk.MustExec(`set @a1=1, @a2=2`)
tk.MustQuery(`execute stmt using @a1, @a1`).Check(testkit.Rows("1"))
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // cannot use PointGet since it contains a range condition
`Selection_7 1.00 root ge(test.t.a, 1), le(test.t.a, 1)`,
`└─TableReader_6 1.00 root data:TableRangeScan_5`,
` └─TableRangeScan_5 1.00 cop[tikv] table:t range:[1,1], keep order:false, stats:pseudo`))
tk.MustQuery(`execute stmt using @a1, @a2`).Check(testkit.Rows("1", "2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(`prepare stmt from 'select * from t where a=? or a=?'`)
tk.MustQuery(`execute stmt using @a1, @a1`).Check(testkit.Rows("1"))
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // cannot use PointGet since it contains a or condition
`Selection_7 1.00 root or(eq(test.t.a, 1), eq(test.t.a, 1))`,
`└─TableReader_6 1.00 root data:TableRangeScan_5`,
` └─TableRangeScan_5 1.00 cop[tikv] table:t range:[1,1], keep order:false, stats:pseudo`))
tk.MustQuery(`execute stmt using @a1, @a2`).Check(testkit.Rows("1", "2"))

}

func (s *testSerialSuite) TestIssue29101(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -821,10 +898,7 @@ func (s *testSerialSuite) TestIssue29101(c *C) {
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Check(testkit.Rows( // can use IndexJoin
`Projection_6 1.00 root test.customer.c_discount, test.customer.c_last, test.customer.c_credit, test.warehouse.w_tax`,
`└─IndexJoin_14 1.00 root inner join, inner:TableReader_10, outer key:test.customer.c_w_id, inner key:test.warehouse.w_id, equal cond:eq(test.customer.c_w_id, test.warehouse.w_id)`,
` ├─Selection_36(Build) 1.00 root eq(test.customer.c_d_id, 7), eq(test.customer.c_id, 158), eq(test.customer.c_w_id, 936)`,
` │ └─IndexLookUp_35 1.00 root `,
` │ ├─IndexRangeScan_33(Build) 1.00 cop[tikv] table:customer, index:PRIMARY(c_w_id, c_d_id, c_id) range:[936 7 158,936 7 158], keep order:false, stats:pseudo`,
` │ └─TableRowIDScan_34(Probe) 1.00 cop[tikv] table:customer keep order:false, stats:pseudo`,
` ├─Point_Get_33(Build) 1.00 root table:customer, index:PRIMARY(c_w_id, c_d_id, c_id) `,
` └─TableReader_10(Probe) 0.00 root data:Selection_9`,
` └─Selection_9 0.00 cop[tikv] eq(test.warehouse.w_id, 936)`,
` └─TableRangeScan_8 1.00 cop[tikv] table:warehouse range: decided by [test.customer.c_w_id], keep order:false, stats:pseudo`))
Expand Down
31 changes: 27 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,12 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
p: dual,
}, cntPlan, nil
}
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema() &&
// to avoid the over-optimized risk, do not generate PointGet for plan cache, for example,
// `pk>=$a and pk<=$b` can be optimized to a PointGet when `$a==$b`, but it can cause wrong results when `$a!=$b`.
!ds.ctx.GetSessionVars().StmtCtx.UseCache
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()

if canConvertPointGet && expression.MaybeOverOptimized4PlanCache(ds.ctx, path.AccessConds) {
canConvertPointGet = ds.canConvertToPointGetForPlanCache(path)
}

if canConvertPointGet && !path.IsIntHandlePath {
// We simply do not build [batch] point get for prefix indexes. This can be optimized.
canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
Expand Down Expand Up @@ -934,6 +936,27 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
return
}

func (ds *DataSource) canConvertToPointGetForPlanCache(path *util.AccessPath) bool {
// PointGet might contain some over-optimized assumptions, like `a>=1 and a<=1` --> `a=1`, but
// these assumptions may be broken after parameters change.
// So for safety, we narrow down the scope and just generate PointGet in some particular and simple scenarios.

// scenario 1: each column corresponds to a single EQ, `a=1 and b=2 and c=3` --> `[1, 2, 3]`
if len(path.Ranges) > 0 && path.Ranges[0].Width() == len(path.AccessConds) {
for _, accessCond := range path.AccessConds {
f, ok := accessCond.(*expression.ScalarFunction)
if !ok {
return false
}
if f.FuncName.L != ast.EQ {
return false
}
}
return true
}
return false
}

func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath) (task task, err error) {
if prop.TaskTp != property.RootTaskType || !prop.IsEmpty() {
return invalidTask, nil
Expand Down