Skip to content

Commit

Permalink
*: add metadata lock when using the plan cache (#51897)
Browse files Browse the repository at this point in the history
close #51407
  • Loading branch information
wjhuang2016 authored Apr 28, 2024
1 parent 66ba419 commit 70a8253
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/ddl/tests/metadatalock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ go_test(
"mdl_test.go",
],
flaky = True,
shard_count = 34,
shard_count = 36,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest/testutil",
"//pkg/errno",
"//pkg/server",
"//pkg/testkit",
Expand Down
98 changes: 98 additions & 0 deletions pkg/ddl/tests/metadatalock/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/failpoint"
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -895,6 +896,103 @@ func TestMDLPreparePlanCacheInvalid(t *testing.T) {
tk.MustQuery(`execute stmt_test_1 using @a;`).Check(testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 <nil>"))
}

func TestMDLPreparePlanCacheExecute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'update t set a = ? where a = ?';`)
tk.MustExec(`set @a = 1, @b = 3;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)

tk.MustExec("begin")

ch := make(chan struct{})

var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

tk.MustQuery("select * from t2")
tk.MustExec(`set @a = 2, @b=4;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
// The plan is from cache, the metadata lock should be added to block the DDL.
ch <- struct{}{}

time.Sleep(5 * time.Second)

tk.MustExec("commit")

wg.Wait()

tk.MustExec("admin check table t")
}

func TestMDLPreparePlanCacheExecute2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'select * from t where a = ?';`)
tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)

tk.MustExec("begin")
tk.MustQuery("select * from t2")

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

wg.Wait()

tk.MustExec(`set @a = 2;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
// The plan should not be from cache because the schema has changed.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("commit")

tk.MustExec("admin check table t")
}

func TestMDLDisable2Enable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func dumpPlanReplayerExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanR
return err
}

// extractTableNames extracts table names from the given stmts.
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
execStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.SQLKiller.Reset()
vars.SQLKiller.ConnID = vars.ConnectionID
vars.StmtCtx.TableStats = make(map[int64]any)
sc.MDLRelatedTableIDs = make(map[int64]struct{})

isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (e *PrepareExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}
}
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, nil)
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema())
if err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func (e *DeallocateExec) Next(context.Context, *chunk.Chunk) error {
if e.Ctx().GetSessionVars().EnablePreparedPlanCache {
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(e.Ctx(), preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, preparedObj.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), preparedObj.RelateVersion)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

tableInfo.Revision++

data, err := json.Marshal(tableInfo)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ type TableInfo struct {

TTLInfo *TTLInfo `json:"ttl_info"`

// Revision is per table schema's version, it will be increased when the schema changed.
Revision uint64 `json:"revision"`

DBID int64 `json:"-"`
}

Expand Down
36 changes: 31 additions & 5 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,33 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
return errors.Trace(err)
}

// step 3: check schema version
if stmt.SchemaVersion != is.SchemaMetaVersion() {
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
stmt.tbls[i] = tblByName
stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
}
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx.GetPlanCtx(), stmt.dbName[i], stmt.tbls[i], is)
if err != nil {
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
}

// step 4: check schema version
if schemaNotMatch || stmt.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
Expand All @@ -129,7 +154,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmt.SchemaVersion = is.SchemaMetaVersion()
}

// step 4: handle expiration
// step 5: handle expiration
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
Expand All @@ -140,6 +165,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmt.PointGet.Plan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}

return nil
}

Expand Down Expand Up @@ -193,7 +219,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
stmt.StmtDB, stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.StmtDB, stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -338,7 +364,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down
Loading

0 comments on commit 70a8253

Please sign in to comment.