From 3f1470d5a8ca91ad20bb2890f5b59ffa3d86a16a Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 14:57:15 +0800 Subject: [PATCH] fix --- executor/adapter.go | 92 +++++++++++++++++++++++++++++++++++ executor/builder.go | 5 -- executor/executor.go | 1 - session/session.go | 90 ---------------------------------- sessionctx/stmtctx/stmtctx.go | 3 -- 5 files changed, 92 insertions(+), 99 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index c087a50e5f5f0..1eb5cd6dd7625 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain" @@ -58,6 +59,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/plancodec" + "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm // 4. update the `PrevStmt` in session variable. // 5. reset `DurationParse` in session variable. func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) { + se := a.Ctx + if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { + stmtNode := a.GetStmtNode() + if se.GetSessionVars().EnablePlanReplayedContinuesCapture { + if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { + checkPlanReplayerContinuesCapture(se, stmtNode, txnTS) + } + } else { + checkPlanReplayerCaptureTask(se, stmtNode, txnTS) + } + } + sessVars := a.Ctx.GetSessionVars() execDetail := sessVars.StmtCtx.GetExecDetails() // Attach commit/lockKeys runtime stats to executor runtime stats. @@ -1953,3 +1967,81 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model. } return r } + +// only allow select/delete/update/insert/execute stmt captured by continues capture +func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { + switch stmtNode.(type) { + case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: + return true + default: + return false + } +} + +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() { + if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) + return + } + } + } +} + +func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) + if existed { + return + } + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) + sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) +} + +func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, + startTS uint64, isContinuesCapture bool) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: key, + StartTS: startTS, + EncodePlan: GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + IsCapture: true, + IsContinuesCapture: isContinuesCapture, + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} diff --git a/executor/builder.go b/executor/builder.go index 1543d5fe4ab49..c702d7b4944c7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1725,11 +1725,6 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu // `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts // Please notice that in RC isolation, the above two ts are the same func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) { - defer func() { - if err == nil { - b.ctx.GetSessionVars().StmtCtx.StmtSnapshotTS = ts - } - }() if b.forDataReaderBuilder { return b.dataReaderTS, nil } diff --git a/executor/executor.go b/executor/executor.go index afefff44fef17..9f95e63aaed20 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1958,7 +1958,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } else { sc.UseDynamicPruneMode = false } - sc.StmtSnapshotTS = 0 sc.StatsLoad.Timeout = 0 sc.StatsLoad.NeededItems = nil diff --git a/session/session.go b/session/session.go index d0617dee85436..9f707e19a1fda 100644 --- a/session/session.go +++ b/session/session.go @@ -96,7 +96,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" "github.com/pingcap/tidb/util/memory" - "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -2373,17 +2372,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. se: se, }, err } - if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { - stmtNode := s.GetStmtNode() - startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS - if se.GetSessionVars().EnablePlanReplayedContinuesCapture { - if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { - checkPlanReplayerContinuesCapture(se, stmtNode, startTS) - } - } else { - checkPlanReplayerCaptureTask(se, stmtNode, startTS) - } - } err = finishStmt(ctx, se, err, s) if se.hasQuerySpecial() { @@ -2398,84 +2386,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } -// only allow select/delete/update/insert/execute stmt captured by continues capture -func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { - switch stmtNode.(type) { - case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: - return true - default: - return false - } -} - -func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - tasks := handle.GetTasks() - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - for _, task := range tasks { - if task.SQLDigest == sqlDigest.String() { - if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) - return - } - } - } -} - -func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) - if existed { - return - } - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) - sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) -} - -func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, - startTS uint64, isContinuesCapture bool) { - stmtCtx := sctx.GetSessionVars().StmtCtx - handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) - dumpTask := &domain.PlanReplayerDumpTask{ - PlanReplayerTaskKey: key, - StartTS: startTS, - EncodePlan: executor.GetEncodedPlan, - TblStats: stmtCtx.TableStats, - SessionBindings: handle.GetAllBindRecord(), - SessionVars: sctx.GetSessionVars(), - ExecStmts: []ast.StmtNode{stmtNode}, - Analyze: false, - IsCapture: true, - IsContinuesCapture: isContinuesCapture, - } - domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) -} - // ExecStmtVarKeyType is a dummy type to avoid naming collision in context. type ExecStmtVarKeyType int diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 4b7efa4ad5435..01ead10e580fc 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -390,9 +390,6 @@ type StatementContext struct { TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc useChunkAlloc bool - - // StmtSnapshotTS indicates the snapshot ts for the stmt - StmtSnapshotTS uint64 } // StmtHints are SessionVars related sql hints.