Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan: do not cache plan for query on range partition table (#15697) #15749

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,59 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
if err != nil {
return nil, err
}
<<<<<<< HEAD
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache {
sctx.PreparedPlanCache().Put(cacheKey, NewPSTMTPlanCacheValue(p))
=======
e.names = names
e.Plan = p
isRange := e.isRangePartition(p)
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache && !isRange {
cached := NewPSTMTPlanCacheValue(p, names)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
sctx.PreparedPlanCache().Put(cacheKey, cached)
}
return err
}

// tryCachePointPlan will try to cache point execution plan, there may be some
// short paths for these executions, currently "point select" and "point update"
func (e *Execute) tryCachePointPlan(ctx context.Context, sctx sessionctx.Context,
preparedStmt *CachedPrepareStmt, is infoschema.InfoSchema, p Plan) error {
var (
prepared = preparedStmt.PreparedAst
ok bool
err error
names types.NameSlice
)
switch p.(type) {
case *PointGetPlan:
ok, err = IsPointGetWithPKOrUniqueKeyByAutoCommit(sctx, p)
names = p.OutputNames()
if err != nil {
return err
}
case *Update:
ok, err = IsPointUpdateByAutoCommit(sctx, p)
if err != nil {
return err
}
if ok {
// make constant expression store paramMarker
sctx.GetSessionVars().StmtCtx.PointExec = true
p, names, err = OptimizeAstNode(ctx, sctx, prepared.Stmt, is)
}
}
if ok {
// just cache point plan now
prepared.CachedPlan = p
prepared.CachedNames = names
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
}
return p, err
}
Expand All @@ -278,6 +328,18 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
<<<<<<< HEAD
=======
if ts.Table.Partition != nil && ts.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromTable(e.ctx, ts, sc, pkCol)
if err != nil {
return err
}
if pID != -1 {
ts.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
} else {
ts.Ranges = ranger.FullIntRange(false)
}
Expand All @@ -287,12 +349,38 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
<<<<<<< HEAD
=======
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
case *PhysicalIndexLookUpReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
is.Ranges, err = e.buildRangeForIndexScan(sctx, is)
if err != nil {
return err
}
<<<<<<< HEAD
=======
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
tblScan := x.TablePlans[0].(*PhysicalTableScan)
tblScan.physicalTableID = pID
}
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)
case *PointGetPlan:
if x.HandleParam != nil {
x.Handle, err = x.HandleParam.Datum.ToInt64(sc)
Expand Down Expand Up @@ -330,6 +418,36 @@ func (e *Execute) rebuildRange(p Plan) error {
return nil
}

func checkRangePartitionInfo(pi *model.PartitionInfo) bool {
if pi != nil && pi.Type == model.PartitionTypeRange {
return true
}
return false
}

// Prepare plan cache is not support query plan on range partition table.
func (e *Execute) isRangePartition(p Plan) bool {
isRange := false
switch x := p.(type) {
case *PhysicalTableReader:
ts := x.TablePlans[0].(*PhysicalTableScan)
return checkRangePartitionInfo(ts.Table.Partition)
case *PhysicalIndexLookUpReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
return checkRangePartitionInfo(is.Table.Partition)
case *PhysicalIndexReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
return checkRangePartitionInfo(is.Table.Partition)
case PhysicalPlan:
for _, child := range x.Children() {
if e.isRangePartition(child) {
isRange = true
}
}
}
return isRange
}

func (e *Execute) buildRangeForIndexScan(sctx sessionctx.Context, is *PhysicalIndexScan) ([]*ranger.Range, error) {
idxCols, colLengths := expression.IndexInfo2Cols(is.schema.Columns, is.Index)
if len(idxCols) == 0 {
Expand Down
193 changes: 193 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,196 @@ func (s *testPrepareSuite) TestPrepareForGroupByItems(c *C) {
tk.MustExec("set @a=2.0;")
tk.MustQuery("execute s1 using @a;").Check(testkit.Rows("3"))
}
<<<<<<< HEAD
=======

func (s *testPrepareSuite) TestPrepareCacheForPartition(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
store.Close()
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
// Test for PointGet and IndexRead.
tk.MustExec("drop table if exists t_index_read")
tk.MustExec("create table t_index_read (id int, k int, c varchar(10), primary key (id, k)) partition by hash(id+k) partitions 10")
tk.MustExec("insert into t_index_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt1 from 'select c from t_index_read where id = ? and k = ?;'")
tk.MustExec("set @id=1, @k=2")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt2 from 'select c from t_index_read where id = ? and k = ? and 1 = 1;'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("xyz"))
// Test for TableScan.
tk.MustExec("drop table if exists t_table_read")
tk.MustExec("create table t_table_read (id int, k int, c varchar(10), primary key(id)) partition by hash(id) partitions 10")
tk.MustExec("insert into t_table_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt3 from 'select c from t_index_read where id = ?;'")
tk.MustExec("set @id=1")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt4 from 'select c from t_index_read where id = ? and k = ?'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("xyz"))
// Query on range partition tables should not raise error.
tk.MustExec("create table t_range_index (id int, k int, c varchar(10), primary key(id)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_index values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt5 from 'select c from t_range_index where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("hij"))

tk.MustExec("create table t_range_table (id int, k int, c varchar(10)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_table values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt6 from 'select c from t_range_table where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("hij"))
}

func newSession(c *C, store kv.Storage, dbName string) session.Session {
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
mustExec(c, se, "create database if not exists "+dbName)
mustExec(c, se, "use "+dbName)
return se
}

func mustExec(c *C, se session.Session, sql string) {
_, err := se.Execute(context.Background(), sql)
c.Assert(err, IsNil)
}

func (s *testPrepareSerialSuite) TestConstPropAndPPDWithCache(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
store.Close()
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a varchar(8) not null, b varchar(8) not null)")
tk.MustExec("insert into t values('1','1')")

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t2.b = ? and t2.b = ?"`)
tk.MustExec("set @p0 = '1', @p1 = '2';")
tk.MustQuery("execute stmt using @p0, @p1").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '1', @p1 = '1'")
tk.MustQuery("execute stmt using @p0, @p1").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and ?"`)
tk.MustExec("set @p0 = 0")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = 1")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where ?"`)
tk.MustExec("set @p0 = 0")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = 1")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t2.b = '1' and t2.b = ?"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))
tk.MustExec("set @p0 = '2'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.a > ?"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '0'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"1",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.b > ? and t1.b > ?"`)
tk.MustExec("set @p0 = '0', @p1 = '0'")
tk.MustQuery("execute stmt using @p0,@p1").Check(testkit.Rows(
"1",
))
tk.MustExec("set @p0 = '0', @p1 = '1'")
tk.MustQuery("execute stmt using @p0,@p1").Check(testkit.Rows(
"0",
))

tk.MustExec(`prepare stmt from "select count(1) from t t1, t t2 where t1.a = t2.a and t1.b > ? and t1.b > '1'"`)
tk.MustExec("set @p0 = '1'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
tk.MustExec("set @p0 = '0'")
tk.MustQuery("execute stmt using @p0").Check(testkit.Rows(
"0",
))
}
>>>>>>> ec156a6... plan: do not cache plan for query on range partition table (#15697)