From 4c47df3a13a4786f008dee37b63029cf7b4faa04 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 26 Dec 2022 14:42:41 +0800 Subject: [PATCH 1/9] support prepared stmt --- executor/builder.go | 7 ++- executor/compiler.go | 85 ----------------------------------- executor/executor_test.go | 16 ++++++- executor/plan_replayer.go | 5 +++ session/session.go | 83 ++++++++++++++++++++++++++++++++++ sessionctx/stmtctx/stmtctx.go | 3 ++ 6 files changed, 112 insertions(+), 87 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d4270397eecd0..46584599e94ee 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1724,7 +1724,12 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu // `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts // Please notice that in RC isolation, the above two ts are the same -func (b *executorBuilder) getSnapshotTS() (uint64, error) { +func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) { + defer func() { + if err == nil { + b.ctx.GetSessionVars().StmtCtx.StmtSnapshotTS = ts + } + }() if b.forDataReaderBuilder { return b.dataReaderTS, nil } diff --git a/executor/compiler.go b/executor/compiler.go index e2c2a29794d1d..241b15874e1e2 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -21,9 +21,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -34,7 +32,6 @@ import ( "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" - "github.com/pingcap/tidb/util/replayer" "go.uber.org/zap" ) @@ -157,91 +154,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } - if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { - if _, ok := stmtNode.(*ast.SelectStmt); ok { - startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() - if err != nil { - return nil, err - } - if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { - checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) - } else { - checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) - } - } - } - return stmt, nil } -func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - tasks := handle.GetTasks() - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - for _, task := range tasks { - if task.SQLDigest == sqlDigest.String() { - if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) - return - } - } - } -} - -func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) - if existed { - return - } - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) - sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) -} - -func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, - startTS uint64, isContinuesCapture bool) { - stmtCtx := sctx.GetSessionVars().StmtCtx - handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) - dumpTask := &domain.PlanReplayerDumpTask{ - PlanReplayerTaskKey: key, - StartTS: startTS, - EncodePlan: GetEncodedPlan, - TblStats: stmtCtx.TableStats, - SessionBindings: handle.GetAllBindRecord(), - SessionVars: sctx.GetSessionVars(), - ExecStmts: []ast.StmtNode{stmtNode}, - Analyze: false, - IsCapture: true, - IsContinuesCapture: isContinuesCapture, - } - domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) -} - // needLowerPriority checks whether it's needed to lower the execution priority // of a query. // If the estimated output row count of any operator in the physical plan tree diff --git a/executor/executor_test.go b/executor/executor_test.go index 122fbdbe7dd2f..858a4cc9372f9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -178,12 +178,26 @@ func TestPlanReplayer(t *testing.T) { } func TestPlanReplayerCapture(t *testing.T) { - store := testkit.CreateMockStore(t) + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("plan replayer capture '123' '123';") tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123")) tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists") + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("create table t(id int)") + tk.MustExec("prepare stmt from 'update t set id = ? where id = ? + 1';") + tk.MustExec("SET @number = 5;") + tk.MustExec("execute stmt using @number,@number") + _, sqlDigest := tk.Session().GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest() + tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;") + tk.MustExec(fmt.Sprintf("plan replayer capture '%v' '%v'", sqlDigest.String(), planDigest.String())) + err := dom.GetPlanReplayerHandle().CollectPlanReplayerTask() + require.NoError(t, err) + tk.MustExec("execute stmt using @number,@number") + task := dom.GetPlanReplayerHandle().DrainTask() + require.NotNil(t, task) } func TestPlanReplayerContinuesCapture(t *testing.T) { diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index fae6273b3bd5e..ff102e20820b2 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -119,6 +119,11 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error { zap.Error(err)) return err } + err = domain.GetDomain(e.ctx).GetPlanReplayerHandle().CollectPlanReplayerTask() + if err != nil { + logutil.BgLogger().Warn("collect task failed", zap.Error(err)) + } + logutil.BgLogger().Info("collect plan replayer task success") e.endFlag = true return nil } diff --git a/session/session.go b/session/session.go index d358d761560e2..3616d28096002 100644 --- a/session/session.go +++ b/session/session.go @@ -95,6 +95,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -2375,6 +2376,20 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. se: se, }, err } + if !se.GetSessionVars().InRestrictedSQL { + sql, sqlDigest := se.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := se.GetSessionVars().StmtCtx.GetPlanDigest() + fmt.Println(fmt.Sprintf("sql=%v,sqldigest=%v,plandigest=%v", sql, sqlDigest.String(), planDigest.String())) + } + if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { + stmtNode := s.GetStmtNode() + startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS + if se.GetSessionVars().EnablePlanReplayedContinuesCapture { + checkPlanReplayerContinuesCapture(se, stmtNode, startTS) + } else { + checkPlanReplayerCaptureTask(se, stmtNode, startTS) + } + } err = finishStmt(ctx, se, err, s) if se.hasQuerySpecial() { @@ -2389,6 +2404,74 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() { + if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) + return + } + } + } +} + +func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) + if existed { + return + } + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) + sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) +} + +func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, + startTS uint64, isContinuesCapture bool) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: key, + StartTS: startTS, + EncodePlan: executor.GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + IsCapture: true, + IsContinuesCapture: isContinuesCapture, + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} + // ExecStmtVarKeyType is a dummy type to avoid naming collision in context. type ExecStmtVarKeyType int diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 9a49c24851c08..0743e22593549 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -384,6 +384,9 @@ type StatementContext struct { TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc useChunkAlloc bool + + // StmtSnapshotTS indicates the snapshot ts for the stmt + StmtSnapshotTS uint64 } // StmtHints are SessionVars related sql hints. From d3b43eb5ded520260fc38c4d94d37033ef8de016 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 26 Dec 2022 14:44:04 +0800 Subject: [PATCH 2/9] support prepared stmt --- session/session.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/session/session.go b/session/session.go index 3616d28096002..7bc1e4a5eaa6a 100644 --- a/session/session.go +++ b/session/session.go @@ -2376,11 +2376,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. se: se, }, err } - if !se.GetSessionVars().InRestrictedSQL { - sql, sqlDigest := se.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := se.GetSessionVars().StmtCtx.GetPlanDigest() - fmt.Println(fmt.Sprintf("sql=%v,sqldigest=%v,plandigest=%v", sql, sqlDigest.String(), planDigest.String())) - } if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { stmtNode := s.GetStmtNode() startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS From 0b8cbe87bfc590f3ca2675fc6fd98a3b442ee9ce Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 26 Dec 2022 15:22:33 +0800 Subject: [PATCH 3/9] fix --- session/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/session/BUILD.bazel b/session/BUILD.bazel index be3c8699ee6c8..31cb06e9eb468 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//util/mathutil", "//util/memory", "//util/parser", + "//util/replayer", "//util/rowcodec", "//util/sem", "//util/sli", From a846cc5be37e370a3cda5d5670af75494912b5d4 Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 12:44:54 +0800 Subject: [PATCH 4/9] fix --- session/session.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index 3a89d2dec8485..d0617dee85436 100644 --- a/session/session.go +++ b/session/session.go @@ -2377,7 +2377,9 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. stmtNode := s.GetStmtNode() startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS if se.GetSessionVars().EnablePlanReplayedContinuesCapture { - checkPlanReplayerContinuesCapture(se, stmtNode, startTS) + if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { + checkPlanReplayerContinuesCapture(se, stmtNode, startTS) + } } else { checkPlanReplayerCaptureTask(se, stmtNode, startTS) } @@ -2396,6 +2398,16 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } +// only allow select/delete/update/insert/execute stmt captured by continues capture +func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { + switch stmtNode.(type) { + case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: + return true + default: + return false + } +} + func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { dom := domain.GetDomain(sctx) if dom == nil { From f029d6581667363de68f9b3636fac3d6fbfcb8e4 Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 14:27:35 +0800 Subject: [PATCH 5/9] fix --- executor/executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/executor.go b/executor/executor.go index 9f95e63aaed20..afefff44fef17 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1958,6 +1958,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } else { sc.UseDynamicPruneMode = false } + sc.StmtSnapshotTS = 0 sc.StatsLoad.Timeout = 0 sc.StatsLoad.NeededItems = nil From 3f1470d5a8ca91ad20bb2890f5b59ffa3d86a16a Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 14:57:15 +0800 Subject: [PATCH 6/9] fix --- executor/adapter.go | 92 +++++++++++++++++++++++++++++++++++ executor/builder.go | 5 -- executor/executor.go | 1 - session/session.go | 90 ---------------------------------- sessionctx/stmtctx/stmtctx.go | 3 -- 5 files changed, 92 insertions(+), 99 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index c087a50e5f5f0..1eb5cd6dd7625 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain" @@ -58,6 +59,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/plancodec" + "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm // 4. update the `PrevStmt` in session variable. // 5. reset `DurationParse` in session variable. func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) { + se := a.Ctx + if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { + stmtNode := a.GetStmtNode() + if se.GetSessionVars().EnablePlanReplayedContinuesCapture { + if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { + checkPlanReplayerContinuesCapture(se, stmtNode, txnTS) + } + } else { + checkPlanReplayerCaptureTask(se, stmtNode, txnTS) + } + } + sessVars := a.Ctx.GetSessionVars() execDetail := sessVars.StmtCtx.GetExecDetails() // Attach commit/lockKeys runtime stats to executor runtime stats. @@ -1953,3 +1967,81 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model. } return r } + +// only allow select/delete/update/insert/execute stmt captured by continues capture +func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { + switch stmtNode.(type) { + case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: + return true + default: + return false + } +} + +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() { + if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) + return + } + } + } +} + +func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) + if existed { + return + } + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) + sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) +} + +func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, + startTS uint64, isContinuesCapture bool) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: key, + StartTS: startTS, + EncodePlan: GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + IsCapture: true, + IsContinuesCapture: isContinuesCapture, + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} diff --git a/executor/builder.go b/executor/builder.go index 1543d5fe4ab49..c702d7b4944c7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1725,11 +1725,6 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu // `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts // Please notice that in RC isolation, the above two ts are the same func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) { - defer func() { - if err == nil { - b.ctx.GetSessionVars().StmtCtx.StmtSnapshotTS = ts - } - }() if b.forDataReaderBuilder { return b.dataReaderTS, nil } diff --git a/executor/executor.go b/executor/executor.go index afefff44fef17..9f95e63aaed20 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1958,7 +1958,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } else { sc.UseDynamicPruneMode = false } - sc.StmtSnapshotTS = 0 sc.StatsLoad.Timeout = 0 sc.StatsLoad.NeededItems = nil diff --git a/session/session.go b/session/session.go index d0617dee85436..9f707e19a1fda 100644 --- a/session/session.go +++ b/session/session.go @@ -96,7 +96,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" "github.com/pingcap/tidb/util/memory" - "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -2373,17 +2372,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. se: se, }, err } - if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { - stmtNode := s.GetStmtNode() - startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS - if se.GetSessionVars().EnablePlanReplayedContinuesCapture { - if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { - checkPlanReplayerContinuesCapture(se, stmtNode, startTS) - } - } else { - checkPlanReplayerCaptureTask(se, stmtNode, startTS) - } - } err = finishStmt(ctx, se, err, s) if se.hasQuerySpecial() { @@ -2398,84 +2386,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. return nil, err } -// only allow select/delete/update/insert/execute stmt captured by continues capture -func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { - switch stmtNode.(type) { - case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: - return true - default: - return false - } -} - -func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - tasks := handle.GetTasks() - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - for _, task := range tasks { - if task.SQLDigest == sqlDigest.String() { - if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) - return - } - } - } -} - -func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) - if existed { - return - } - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) - sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) -} - -func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, - startTS uint64, isContinuesCapture bool) { - stmtCtx := sctx.GetSessionVars().StmtCtx - handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) - dumpTask := &domain.PlanReplayerDumpTask{ - PlanReplayerTaskKey: key, - StartTS: startTS, - EncodePlan: executor.GetEncodedPlan, - TblStats: stmtCtx.TableStats, - SessionBindings: handle.GetAllBindRecord(), - SessionVars: sctx.GetSessionVars(), - ExecStmts: []ast.StmtNode{stmtNode}, - Analyze: false, - IsCapture: true, - IsContinuesCapture: isContinuesCapture, - } - domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) -} - // ExecStmtVarKeyType is a dummy type to avoid naming collision in context. type ExecStmtVarKeyType int diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 4b7efa4ad5435..01ead10e580fc 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -390,9 +390,6 @@ type StatementContext struct { TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc useChunkAlloc bool - - // StmtSnapshotTS indicates the snapshot ts for the stmt - StmtSnapshotTS uint64 } // StmtHints are SessionVars related sql hints. From c00c36b7269c46c2b51a8427cf92191054dce664 Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 15:20:36 +0800 Subject: [PATCH 7/9] revise --- domain/plan_replayer.go | 29 +---------------------------- domain/plan_replayer_dump.go | 19 +++++++------------ server/plan_replayer.go | 6 +++--- util/replayer/replayer.go | 10 +++++----- 4 files changed, 16 insertions(+), 48 deletions(-) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index fc54d30759057..50bda969738c4 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -34,8 +34,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/replayer" @@ -410,7 +408,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc return true } - file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture) + file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsCapture) if err != nil { logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -421,30 +419,6 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc task.Zf = file task.FileName = fileName task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) - jsStats := make(map[int64]*handle.JSONTable) - is := GetDomain(w.sctx).InfoSchema() - if task.IsCapture && !task.IsContinuesCapture { - for tblID, stat := range task.TblStats { - tbl, ok := is.TableByID(tblID) - if !ok { - return false - } - schema, ok := is.SchemaByTable(tbl.Meta()) - if !ok { - return false - } - r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table)) - if err != nil { - logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed", - zap.String("sqlDigest", taskKey.SQLDigest), - zap.String("planDigest", taskKey.PlanDigest), - zap.Error(err)) - return false - } - jsStats[tblID] = r - } - task.JSONTblStats = jsStats - } err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed", @@ -546,7 +520,6 @@ type PlanReplayerDumpTask struct { SessionBindings []*bindinfo.BindRecord EncodedPlan string SessionVars *variable.SessionVars - JSONTblStats map[int64]*handle.JSONTable ExecStmts []ast.StmtNode Analyze bool diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 0dd4945873e58..cad0898c81ef2 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -265,10 +265,10 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, return err } - // For continues capture, we don't dump stats - if !task.IsContinuesCapture { + // For capture task, we don't dump stats + if !task.IsCapture { // Dump stats - if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil { + if err = dumpStats(zw, pairs, do); err != nil { return err } } @@ -415,12 +415,12 @@ func dumpSchemaMeta(zw *zip.Writer, tables map[tableNamePair]struct{}) error { return nil } -func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *Domain) error { +func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *Domain) error { for pair := range pairs { if pair.IsView { continue } - jsonTbl, err := getStatsForTable(do, tblJSONStats, pair) + jsonTbl, err := getStatsForTable(do, pair) if err != nil { return err } @@ -653,19 +653,14 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context, return r, nil } -func getStatsForTable(do *Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) { +func getStatsForTable(do *Domain, pair tableNamePair) (*handle.JSONTable, error) { is := do.InfoSchema() h := do.StatsHandle() tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName)) if err != nil { return nil, err } - js, ok := tblJSONStats[tbl.Meta().ID] - if ok && js != nil { - return js, nil - } - js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true) - return js, err + return h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true) } func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error { diff --git a/server/plan_replayer.go b/server/plan_replayer.go index beb202638d1fd..ef8dab24c560d 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -115,7 +115,7 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req return } if handler.downloadedFilename == "plan_replayer" { - content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler) + content, err = handlePlanReplayerCaptureFile(content, path, handler) if err != nil { writeError(w, err) return @@ -219,8 +219,8 @@ func isExists(path string) (bool, error) { return true, nil } -func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) { - if !strings.Contains(handler.filePath, "continues_replayer") { +func handlePlanReplayerCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) { + if !strings.Contains(handler.filePath, "capture_replayer") { return content, nil } b := bytes.NewReader(content) diff --git a/util/replayer/replayer.go b/util/replayer/replayer.go index f89d26ec97717..39287ada70194 100644 --- a/util/replayer/replayer.go +++ b/util/replayer/replayer.go @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct { } // GeneratePlanReplayerFile generates plan replayer file -func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) { +func GeneratePlanReplayerFile(isCapture bool) (*os.File, string, error) { path := GetPlanReplayerDirName() err := os.MkdirAll(path, os.ModePerm) if err != nil { return nil, "", errors.AddStack(err) } - fileName, err := generatePlanReplayerFileName(isContinues) + fileName, err := generatePlanReplayerFileName(isCapture) if err != nil { return nil, "", errors.AddStack(err) } @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) { return zf, fileName, err } -func generatePlanReplayerFileName(isContinues bool) (string, error) { +func generatePlanReplayerFileName(isCapture bool) (string, error) { // Generate key and create zip file time := time.Now().UnixNano() b := make([]byte, 16) @@ -60,8 +60,8 @@ func generatePlanReplayerFileName(isContinues bool) (string, error) { return "", err } key := base64.URLEncoding.EncodeToString(b) - if isContinues { - return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil + if isCapture { + return fmt.Sprintf("capture_replayer_%v_%v.zip", key, time), nil } return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil } From bfd25ac6e2e6b7af6bd995c92db86b305f4c6e9f Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 16:25:44 +0800 Subject: [PATCH 8/9] revise --- domain/plan_replayer.go | 22 +++++++++++++++++++--- executor/adapter.go | 5 +++++ server/plan_replayer.go | 2 +- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 50bda969738c4..54c109cc34dc3 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" @@ -99,7 +100,7 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName)) continue } - isPlanReplayer := parseType(fileName) == "replayer" + isPlanReplayer := strings.Contains(fileName, "replayer") if !createTime.After(gcTime) { err := os.Remove(filepath.Join(path, f.Name())) if err != nil { @@ -419,6 +420,19 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc task.Zf = file task.FileName = fileName task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) + if task.InExecute && len(task.NormalizedSQL) > 0 { + p := parser.New() + stmts, _, err := p.ParseSQL(task.NormalizedSQL) + if err != nil { + logutil.BgLogger().Warn("[plan-replayer-capture] parse normalized sql failed", + zap.String("sql", task.NormalizedSQL), + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + task.ExecStmts = stmts + } err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed", @@ -512,8 +526,10 @@ type PlanReplayerDumpTask struct { replayer.PlanReplayerTaskKey // tmp variables stored during the query - EncodePlan func(*stmtctx.StatementContext, bool) (string, string) - TblStats map[int64]interface{} + EncodePlan func(*stmtctx.StatementContext, bool) (string, string) + TblStats map[int64]interface{} + InExecute bool + NormalizedSQL string // variables used to dump the plan StartTS uint64 diff --git a/executor/adapter.go b/executor/adapter.go index 1eb5cd6dd7625..a98bac9186b69 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -2043,5 +2043,10 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx. IsCapture: true, IsContinuesCapture: isContinuesCapture, } + if _, ok := stmtNode.(*ast.ExecuteStmt); ok { + nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest() + dumpTask.InExecute = true + dumpTask.NormalizedSQL = nsql + } domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) } diff --git a/server/plan_replayer.go b/server/plan_replayer.go index ef8dab24c560d..64629c6ee0070 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -331,7 +331,7 @@ func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) if err != nil { return "", err } - newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4]) + newPath := strings.Replace(path, "capture_replayer", "copy_capture_replayer", 1) zf, err := os.Create(newPath) if err != nil { return "", err From babf8aaeb77a414ce3a28f78696c5492c13e4290 Mon Sep 17 00:00:00 2001 From: yisaer Date: Fri, 30 Dec 2022 18:57:09 +0800 Subject: [PATCH 9/9] revise --- domain/BUILD.bazel | 2 +- session/BUILD.bazel | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 55f68e1ca20bd..eafaa6b4cd8aa 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//meta", "//metrics", "//owner", + "//parser", "//parser/ast", "//parser/model", "//parser/mysql", @@ -45,7 +46,6 @@ go_library( "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//sessionctx/variable", - "//statistics", "//statistics/handle", "//telemetry", "//ttl/ttlworker", diff --git a/session/BUILD.bazel b/session/BUILD.bazel index 67b79c2c9bcdc..dc3106abdfe63 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -79,7 +79,6 @@ go_library( "//util/mathutil", "//util/memory", "//util/parser", - "//util/replayer", "//util/rowcodec", "//util/sem", "//util/sli",