Skip to content

Commit

Permalink
planner: fix the issue that some PointGet plans generated in physical…
Browse files Browse the repository at this point in the history
…-stage cannot be cached (#28478)
  • Loading branch information
qw4990 authored Oct 9, 2021
1 parent 1a0770b commit 501e87e
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 39 deletions.
10 changes: 4 additions & 6 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,9 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess := tk.Se.ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
// t2 should use PointGet.
rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][0]), "Point_Get"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][3]), "table:t2"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][3]), "table:t2"), IsTrue)

tk.MustExec("set @a=2")
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
Expand All @@ -521,10 +520,9 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
// t2 should use PointGet, range is changed to [2,2].
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][0]), "Point_Get"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][3]), "table:t2"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][3]), "table:t2"), IsTrue)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
"2 4 2 2",
))
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (s *testSerialSuite) TestPlanCacheClusterIndex(c *C) {
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Index(rows[1][0].(string), `Point_Get`), Equals, 6)
c.Assert(strings.Contains(rows[3][0].(string), `TableRangeScan`), IsTrue)

// case 3:
tk.MustExec(`drop table if exists ta, tb`)
Expand Down
4 changes: 2 additions & 2 deletions executor/seqtest/prepared_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,14 @@ func TestPreparedInsert(t *testing.T) {
err = counter.Write(pb)
require.NoError(t, err)
hit := pb.GetCounter().GetValue()
require.Equal(t, float64(2), hit)
require.Equal(t, float64(3), hit)
}
tk.MustExec(`set @a=3; execute stmt_insert_select using @a;`)
if flag {
err = counter.Write(pb)
require.NoError(t, err)
hit := pb.GetCounter().GetValue()
require.Equal(t, float64(2), hit)
require.Equal(t, float64(4), hit)
}

result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 101)
Expand Down
2 changes: 1 addition & 1 deletion expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func SplitDNFItems(onExpr Expression) []Expression {
// If the Expression is a non-constant value, it means the result is unknown.
func EvaluateExprWithNull(ctx sessionctx.Context, schema *Schema, expr Expression) Expression {
if MaybeOverOptimized4PlanCache(ctx, []Expression{expr}) {
ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
return expr
}
return evaluateExprWithNull(ctx, schema, expr)
}
Expand Down
26 changes: 26 additions & 0 deletions expression/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ func (s *testEvaluatorSuite) TestEvaluateExprWithNull(c *C) {
c.Assert(res.Equal(s.ctx, NewOne()), IsTrue)
}

func (s *testEvaluatorSerialSuites) TestEvaluateExprWithNullAndParameters(c *C) {
tblInfo := newTestTableBuilder("").add("col0", mysql.TypeLonglong, 0).build()
schema := tableInfoToSchemaForTest(tblInfo)
col0 := schema.Columns[0]

defer func(original bool) {
s.ctx.GetSessionVars().StmtCtx.UseCache = original
}(s.ctx.GetSessionVars().StmtCtx.UseCache)
s.ctx.GetSessionVars().StmtCtx.UseCache = true

// cases for parameters
ltWithoutParam, err := newFunctionForTest(s.ctx, ast.LT, col0, NewOne())
c.Assert(err, IsNil)
res := EvaluateExprWithNull(s.ctx, schema, ltWithoutParam)
c.Assert(res.Equal(s.ctx, NewNull()), IsTrue) // the expression is evaluated to null

param := NewOne()
param.ParamMarker = &ParamMarker{ctx: s.ctx, order: 0}
s.ctx.GetSessionVars().PreparedParams = append(s.ctx.GetSessionVars().PreparedParams, types.NewIntDatum(10))
ltWithParam, err := newFunctionForTest(s.ctx, ast.LT, col0, param)
c.Assert(err, IsNil)
res = EvaluateExprWithNull(s.ctx, schema, ltWithParam)
_, isScalarFunc := res.(*ScalarFunction)
c.Assert(isScalarFunc, IsTrue) // the expression with parameters is not evaluated
}

func (s *testEvaluatorSuite) TestConstant(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
c.Assert(NewZero().IsCorrelated(), IsFalse)
Expand Down
5 changes: 4 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,10 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
p: dual,
}, cntPlan, nil
}
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema() &&
// to avoid the over-optimized risk, do not generate PointGet for plan cache, for example,
// `pk>=$a and pk<=$b` can be optimized to a PointGet when `$a==$b`, but it can cause wrong results when `$a!=$b`.
!ds.ctx.GetSessionVars().StmtCtx.UseCache
if canConvertPointGet && !path.IsIntHandlePath {
// We simply do not build [batch] point get for prefix indexes. This can be optimized.
canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
Expand Down
106 changes: 85 additions & 21 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,48 @@ func (s *testPrepareSerialSuite) TestPrepareTableAsNameOnGroupByWithCache(c *C)
tk.MustQuery("execute stmt").Sort().Check(testkit.Rows("partner1", "partner2", "partner3", "partner4"))
}

func (s *testPrepareSerialSuite) TestPrepareCachePointGetInsert(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1 (a int, b int, primary key(a))")
tk.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)")

tk.MustExec("create table t2 (a int, b int, primary key(a))")
tk.MustExec(`prepare stmt1 from "insert into t2 select * from t1 where a=?"`)

tk.MustExec("set @a=1")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1"))

tk.MustExec("set @a=2")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "2 2"))

tk.MustExec("set @a=3")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "2 2", "3 3"))
}

// nolint:unused
func readGaugeInt(g prometheus.Gauge) int {
ch := make(chan prometheus.Metric, 1)
Expand Down Expand Up @@ -1102,76 +1144,98 @@ func (s *testPlanSerialSuite) TestPlanCachePointGetAndTableDual(c *C) {
tk.MustExec("insert into t0 values('0000','7777',1)")
tk.MustExec("prepare s0 from 'select * from t0 where c1=? and c2>=? and c2<=?'")
tk.MustExec("set @a0='0000', @b0='9999'")
// TableDual plan would be built, we should not cache it.
// TableDual is forbidden for plan-cache, a TableReader be built and cached.
tk.MustQuery("execute s0 using @a0, @b0, @a0").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous TableDual plan.
tk.MustQuery("execute s0 using @a0, @a0, @b0").Check(testkit.Rows("0000 7777 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t1(c1 varchar(20), c2 varchar(20), c3 bigint(20), primary key(c1, c2))")
tk.MustExec("insert into t1 values('0000','7777',1)")
tk.MustExec("prepare s1 from 'select * from t1 where c1=? and c2>=? and c2<=?'")
tk.MustExec("set @a1='0000', @b1='9999'")
// PointGet plan would be built, we should not cache it.
// IndexLookup plan would be built, we should cache it.
tk.MustQuery("execute s1 using @a1, @b1, @b1").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous PointGet plan.
tk.MustQuery("execute s1 using @a1, @a1, @b1").Check(testkit.Rows("0000 7777 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t2(c1 bigint(20) primary key, c2 varchar(20))")
tk.MustExec("insert into t2 values(1,'7777')")
tk.MustExec("prepare s2 from 'select * from t2 where c1>=? and c1<=?'")
tk.MustExec("set @a2=0, @b2=9")
// PointGet plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s2 using @a2, @a2").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous PointGet plan.
tk.MustQuery("execute s2 using @a2, @b2").Check(testkit.Rows("1 7777"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t3(c1 int, c2 int, c3 int, unique key(c1), key(c2))")
tk.MustExec("insert into t3 values(2,1,1)")
tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a3=1,@b3=3")
// PointGet partial plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s3 using @a3,@a3").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial PointGet plan.
tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a3=1,@b3=3")
// TableDual partial plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s3 using @b3,@a3").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial TableDual plan.
tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t4(c1 int primary key, c2 int, c3 int, key(c2))")
tk.MustExec("insert into t4 values(2,1,1)")
tk.MustExec("prepare s4 from 'select /*+ use_index_merge(t4) */ * from t4 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a4=1,@b4=3")
// PointGet partial plan would be built, we should not cache it.
// IndexMerge plan would be built, we should not cache it.
tk.MustQuery("execute s4 using @a4,@a4").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial PointGet plan.
tk.MustQuery("execute s4 using @a4,@b4").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("prepare s4 from 'select /*+ use_index_merge(t4) */ * from t4 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a4=1,@b4=3")
// TableDual partial plan would be built, we should not cache it.
// IndexMerge plan would be built, we should not cache it.
tk.MustQuery("execute s4 using @b4,@a4").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial TableDual plan.
tk.MustQuery("execute s4 using @a4,@b4").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}

func (s *testPrepareSuite) TestIssue26873(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
c.Assert(store.Close(), IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t(a int primary key, b int, c int)")
tk.MustExec("prepare stmt from 'select * from t where a = 2 or a = ?'")
tk.MustExec("set @p = 3")
tk.MustQuery("execute stmt using @p").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @p").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func (s *testPlanSerialSuite) TestIssue23671(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1199,7 +1263,7 @@ func (s *testPlanSerialSuite) TestIssue23671(c *C) {
tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1"))
tk.MustExec("set @a=1, @b=1, @c=10")
tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func (s *testPrepareSerialSuite) TestIssue28246(c *C) {
Expand Down Expand Up @@ -1228,7 +1292,7 @@ func (s *testPrepareSerialSuite) TestIssue28246(c *C) {
tk.MustExec(`prepare stmt from 'select min(col1) from PK_AUTO_RANDOM9111 where col1 > ?;';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("<nil>"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows("9223372036854775807"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("<nil>"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}
Expand Down
2 changes: 0 additions & 2 deletions planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ func (ds *DataSource) derivePathStatsAndTryHeuristics() error {
if selected != nil {
ds.possibleAccessPaths[0] = selected
ds.possibleAccessPaths = ds.possibleAccessPaths[:1]
// TODO: Can we make a more careful check on whether the optimization depends on mutable constants?
ds.ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
if ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain {
var tableName string
if ds.TableAsName.O == "" {
Expand Down
21 changes: 16 additions & 5 deletions util/ranger/detacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,12 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
points[offset] = rb.build(accesses[offset])
}
points[offset] = rb.intersection(points[offset], rb.build(cond))
// Early termination if false expression found
if len(points[offset]) == 0 {
if len(points[offset]) == 0 { // Early termination if false expression found
if expression.MaybeOverOptimized4PlanCache(sctx, conditions) {
// cannot return an empty-range for plan-cache since the range may become non-empty as parameters change
// for safety, return the whole conditions in this case
return nil, conditions, nil, nil, false
}
return nil, nil, nil, nil, true
}
}
Expand All @@ -554,8 +558,12 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
if points[i] == nil {
// There exists an interval whose length is larger than 0
accesses[i] = nil
} else if len(points[i]) == 0 {
// Early termination if false expression found
} else if len(points[i]) == 0 { // Early termination if false expression found
if expression.MaybeOverOptimized4PlanCache(sctx, conditions) {
// cannot return an empty-range for plan-cache since the range may become non-empty as parameters change
// for safety, return the whole conditions in this case
return nil, conditions, nil, nil, false
}
return nil, nil, nil, nil, true
} else {
// All Intervals are single points
Expand All @@ -566,7 +574,10 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
// Maybe we can improve it later.
columnValues[i] = &valueInfo{mutable: true}
}
sctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
if expression.MaybeOverOptimized4PlanCache(sctx, conditions) {
// TODO: optimize it more elaborately, e.g. return [2 3, 2 3] as accesses for 'where a = 2 and b = 3 and c >= ? and c <= ?'
return nil, conditions, nil, nil, false
}
}
}
for i, offset := range offsets {
Expand Down

0 comments on commit 501e87e

Please sign in to comment.