Skip to content

Commit

Permalink
cherry pick pingcap#15697 to release-3.1
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <[email protected]>
  • Loading branch information
Lingyu Song authored and sre-bot committed Mar 26, 2020
1 parent 979f756 commit 63695ac
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 0 deletions.
118 changes: 118 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,59 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
if err != nil {
return nil, err
}
<<<<<<< HEAD
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache {
sctx.PreparedPlanCache().Put(cacheKey, NewPSTMTPlanCacheValue(p))
=======
e.names = names
e.Plan = p
isRange := e.isRangePartition(p)
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache && !isRange {
cached := NewPSTMTPlanCacheValue(p, names)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
sctx.PreparedPlanCache().Put(cacheKey, cached)
}
return err
}

// tryCachePointPlan will try to cache point execution plan, there may be some
// short paths for these executions, currently "point select" and "point update"
func (e *Execute) tryCachePointPlan(ctx context.Context, sctx sessionctx.Context,
preparedStmt *CachedPrepareStmt, is infoschema.InfoSchema, p Plan) error {
var (
prepared = preparedStmt.PreparedAst
ok bool
err error
names types.NameSlice
)
switch p.(type) {
case *PointGetPlan:
ok, err = IsPointGetWithPKOrUniqueKeyByAutoCommit(sctx, p)
names = p.OutputNames()
if err != nil {
return err
}
case *Update:
ok, err = IsPointUpdateByAutoCommit(sctx, p)
if err != nil {
return err
}
if ok {
// make constant expression store paramMarker
sctx.GetSessionVars().StmtCtx.PointExec = true
p, names, err = OptimizeAstNode(ctx, sctx, prepared.Stmt, is)
}
}
if ok {
// just cache point plan now
prepared.CachedPlan = p
prepared.CachedNames = names
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
}
return p, err
}
Expand All @@ -278,6 +328,18 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
<<<<<<< HEAD
=======
if ts.Table.Partition != nil && ts.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromTable(e.ctx, ts, sc, pkCol)
if err != nil {
return err
}
if pID != -1 {
ts.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
} else {
ts.Ranges = ranger.FullIntRange(false)
}
Expand All @@ -287,12 +349,38 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
<<<<<<< HEAD
=======
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
case *PhysicalIndexLookUpReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
is.Ranges, err = e.buildRangeForIndexScan(sctx, is)
if err != nil {
return err
}
<<<<<<< HEAD
=======
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
tblScan := x.TablePlans[0].(*PhysicalTableScan)
tblScan.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
case *PointGetPlan:
if x.HandleParam != nil {
x.Handle, err = x.HandleParam.Datum.ToInt64(sc)
Expand Down Expand Up @@ -330,6 +418,36 @@ func (e *Execute) rebuildRange(p Plan) error {
return nil
}

func checkRangePartitionInfo(pi *model.PartitionInfo) bool {
if pi != nil && pi.Type == model.PartitionTypeRange {
return true
}
return false
}

// Prepare plan cache is not support query plan on range partition table.
func (e *Execute) isRangePartition(p Plan) bool {
isRange := false
switch x := p.(type) {
case *PhysicalTableReader:
ts := x.TablePlans[0].(*PhysicalTableScan)
return checkRangePartitionInfo(ts.Table.Partition)
case *PhysicalIndexLookUpReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
return checkRangePartitionInfo(is.Table.Partition)
case *PhysicalIndexReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
return checkRangePartitionInfo(is.Table.Partition)
case PhysicalPlan:
for _, child := range x.Children() {
if e.isRangePartition(child) {
isRange = true
}
}
}
return isRange
}

func (e *Execute) buildRangeForIndexScan(sctx sessionctx.Context, is *PhysicalIndexScan) ([]*ranger.Range, error) {
idxCols, colLengths := expression.IndexInfo2Cols(is.schema.Columns, is.Index)
if len(idxCols) == 0 {
Expand Down
193 changes: 193 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,196 @@ func (s *testPrepareSuite) TestPrepareForGroupByItems(c *C) {
tk.MustExec("set @a=2.0;")
tk.MustQuery("execute s1 using @a;").Check(testkit.Rows("3"))
}
<<<<<<< HEAD
=======

func (s *testPrepareSuite) TestPrepareCacheForPartition(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
store.Close()
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
// Test for PointGet and IndexRead.
tk.MustExec("drop table if exists t_index_read")
tk.MustExec("create table t_index_read (id int, k int, c varchar(10), primary key (id, k)) partition by hash(id+k) partitions 10")
tk.MustExec("insert into t_index_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt1 from 'select c from t_index_read where id = ? and k = ?;'")
tk.MustExec("set @id=1, @k=2")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt2 from 'select c from t_index_read where id = ? and k = ? and 1 = 1;'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("xyz"))
// Test for TableScan.
tk.MustExec("drop table if exists t_table_read")
tk.MustExec("create table t_table_read (id int, k int, c varchar(10), primary key(id)) partition by hash(id) partitions 10")
tk.MustExec("insert into t_table_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt3 from 'select c from t_index_read where id = ?;'")
tk.MustExec("set @id=1")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt4 from 'select c from t_index_read where id = ? and k = ?'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("xyz"))
// Query on range partition tables should not raise error.
tk.MustExec("create table t_range_index (id int, k int, c varchar(10), primary key(id)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_index values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt5 from 'select c from t_range_index where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("hij"))

tk.MustExec("create table t_range_table (id int, k int, c varchar(10)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_table values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt6 from 'select c from t_range_table where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("hij"))
}

func newSession(c *C, store kv.Storage, dbName string) session.Session {
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
mustExec(c, se, "create database if not exists "+dbName)
mustExec(c, se, "use "+dbName)
return se
}

func mustExec(c *C, se session.Session, sql string) {
_, err := se.Execute(context.Background(), sql)
c.Assert(err, IsNil)
}

func (s *testPrepareSerialSuite) TestConstPropAndPPDWithCache(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
store.Close()
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a varchar(8) not null, b varchar(8) not null)")
tk.MustExec("insert into t values('1','1')")

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t2.b = ? and t2.b = ?"`)
tk.MustExec("set @p0 = '1', @p1 = '2';")
tk.MustQuery("execute stmt using @p0, @p1").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '1', @p1 = '1'")
tk.MustQuery("execute stmt using @p0, @p1").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and ?"`)
tk.MustExec("set @p0 = 0")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = 1")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where ?"`)
tk.MustExec("set @p0 = 0")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = 1")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t2.b = '1' and t2.b = ?"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))
tk.MustExec("set @p0 = '2'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.a > ?"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '0'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.b > ? and t1.b > ?"`)
tk.MustExec("set @p0 = '0', @p1 = '0'")
tk.MustQuery("execute stmt using @p0,@p1").Check(testkit.Rows(
"1",
))
tk.MustExec("set @p0 = '0', @p1 = '1'")
tk.MustQuery("execute stmt using @p0,@p1").Check(testkit.Rows(
"0",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.b > ? and t1.b > '1'"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '0'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)

0 comments on commit 63695ac

Please sign in to comment.