diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 55f68e1ca20bd..eafaa6b4cd8aa 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//meta", "//metrics", "//owner", + "//parser", "//parser/ast", "//parser/model", "//parser/mysql", @@ -45,7 +46,6 @@ go_library( "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//sessionctx/variable", - "//statistics", "//statistics/handle", "//telemetry", "//ttl/ttlworker", diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index fc54d30759057..54c109cc34dc3 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -29,13 +29,12 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/replayer" @@ -101,7 +100,7 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName)) continue } - isPlanReplayer := parseType(fileName) == "replayer" + isPlanReplayer := strings.Contains(fileName, "replayer") if !createTime.After(gcTime) { err := os.Remove(filepath.Join(path, f.Name())) if err != nil { @@ -410,7 +409,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc return true } - file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture) + file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsCapture) if err != nil { logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -421,29 +420,18 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc task.Zf = file task.FileName = fileName task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) - jsStats := make(map[int64]*handle.JSONTable) - is := GetDomain(w.sctx).InfoSchema() - if task.IsCapture && !task.IsContinuesCapture { - for tblID, stat := range task.TblStats { - tbl, ok := is.TableByID(tblID) - if !ok { - return false - } - schema, ok := is.SchemaByTable(tbl.Meta()) - if !ok { - return false - } - r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table)) - if err != nil { - logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed", - zap.String("sqlDigest", taskKey.SQLDigest), - zap.String("planDigest", taskKey.PlanDigest), - zap.Error(err)) - return false - } - jsStats[tblID] = r + if task.InExecute && len(task.NormalizedSQL) > 0 { + p := parser.New() + stmts, _, err := p.ParseSQL(task.NormalizedSQL) + if err != nil { + logutil.BgLogger().Warn("[plan-replayer-capture] parse normalized sql failed", + zap.String("sql", task.NormalizedSQL), + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false } - task.JSONTblStats = jsStats + task.ExecStmts = stmts } err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { @@ -538,15 +526,16 @@ type PlanReplayerDumpTask struct { replayer.PlanReplayerTaskKey // tmp variables stored during the query - EncodePlan func(*stmtctx.StatementContext, bool) (string, string) - TblStats map[int64]interface{} + EncodePlan func(*stmtctx.StatementContext, bool) (string, string) + TblStats map[int64]interface{} + InExecute bool + NormalizedSQL string // variables used to dump the plan StartTS uint64 SessionBindings []*bindinfo.BindRecord EncodedPlan string SessionVars *variable.SessionVars - JSONTblStats map[int64]*handle.JSONTable ExecStmts []ast.StmtNode Analyze bool diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 0dd4945873e58..cad0898c81ef2 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -265,10 +265,10 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, return err } - // For continues capture, we don't dump stats - if !task.IsContinuesCapture { + // For capture task, we don't dump stats + if !task.IsCapture { // Dump stats - if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil { + if err = dumpStats(zw, pairs, do); err != nil { return err } } @@ -415,12 +415,12 @@ func dumpSchemaMeta(zw *zip.Writer, tables map[tableNamePair]struct{}) error { return nil } -func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *Domain) error { +func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *Domain) error { for pair := range pairs { if pair.IsView { continue } - jsonTbl, err := getStatsForTable(do, tblJSONStats, pair) + jsonTbl, err := getStatsForTable(do, pair) if err != nil { return err } @@ -653,19 +653,14 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context, return r, nil } -func getStatsForTable(do *Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) { +func getStatsForTable(do *Domain, pair tableNamePair) (*handle.JSONTable, error) { is := do.InfoSchema() h := do.StatsHandle() tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName)) if err != nil { return nil, err } - js, ok := tblJSONStats[tbl.Meta().ID] - if ok && js != nil { - return js, nil - } - js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true) - return js, err + return h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true) } func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error { diff --git a/executor/adapter.go b/executor/adapter.go index c087a50e5f5f0..a98bac9186b69 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain" @@ -58,6 +59,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/plancodec" + "github.com/pingcap/tidb/util/replayer" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm // 4. update the `PrevStmt` in session variable. // 5. reset `DurationParse` in session variable. func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) { + se := a.Ctx + if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() { + stmtNode := a.GetStmtNode() + if se.GetSessionVars().EnablePlanReplayedContinuesCapture { + if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) { + checkPlanReplayerContinuesCapture(se, stmtNode, txnTS) + } + } else { + checkPlanReplayerCaptureTask(se, stmtNode, txnTS) + } + } + sessVars := a.Ctx.GetSessionVars() execDetail := sessVars.StmtCtx.GetExecDetails() // Attach commit/lockKeys runtime stats to executor runtime stats. @@ -1953,3 +1967,86 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model. } return r } + +// only allow select/delete/update/insert/execute stmt captured by continues capture +func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool { + switch stmtNode.(type) { + case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt: + return true + default: + return false + } +} + +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() { + if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) + return + } + } + } +} + +func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest() + key := replayer.PlanReplayerTaskKey{ + SQLDigest: sqlDigest.String(), + PlanDigest: planDigest.String(), + } + existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) + if existed { + return + } + sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) + sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) +} + +func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, + startTS uint64, isContinuesCapture bool) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: key, + StartTS: startTS, + EncodePlan: GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + IsCapture: true, + IsContinuesCapture: isContinuesCapture, + } + if _, ok := stmtNode.(*ast.ExecuteStmt); ok { + nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest() + dumpTask.InExecute = true + dumpTask.NormalizedSQL = nsql + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} diff --git a/executor/builder.go b/executor/builder.go index f771c706bfa9b..c014893a0c86c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1724,7 +1724,7 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu // `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts // Please notice that in RC isolation, the above two ts are the same -func (b *executorBuilder) getSnapshotTS() (uint64, error) { +func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) { if b.forDataReaderBuilder { return b.dataReaderTS, nil } diff --git a/executor/compiler.go b/executor/compiler.go index 821561899f4e7..9f089eed9bae0 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -21,9 +21,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -34,7 +32,6 @@ import ( "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" - "github.com/pingcap/tidb/util/replayer" "go.uber.org/zap" ) @@ -157,111 +154,12 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } - if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil { return nil, err } - - if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { - startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() - if err != nil { - return nil, err - } - if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { - checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) - } else { - checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) - } - } return stmt, nil } -func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - captured := false - tasks := handle.GetTasks() - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) - defer func() { - logutil.BgLogger().Debug("[plan-replayer-capture] check capture task", - zap.String("sql-digest", sqlDigest.String()), - zap.String("plan-digest", planDigest.String()), - zap.Int("tasks", len(tasks)), - zap.Bool("captured", captured)) - }() - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - for _, task := range tasks { - if task.SQLDigest == sqlDigest.String() { - if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) - return - } - } - } -} - -func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { - dom := domain.GetDomain(sctx) - if dom == nil { - return - } - handle := dom.GetPlanReplayerHandle() - if handle == nil { - return - } - _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() - _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) - key := replayer.PlanReplayerTaskKey{ - SQLDigest: sqlDigest.String(), - PlanDigest: planDigest.String(), - } - captured := false - defer func() { - logutil.BgLogger().Debug("[plan-replayer-capture] check continues capture task", - zap.String("sql-digest", sqlDigest.String()), - zap.String("plan-digest", planDigest.String()), - zap.Bool("captured", captured)) - }() - - existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) - if existed { - return - } - captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) - if captured { - sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) - } -} - -func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, - startTS uint64, isContinuesCapture bool) bool { - stmtCtx := sctx.GetSessionVars().StmtCtx - handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) - dumpTask := &domain.PlanReplayerDumpTask{ - PlanReplayerTaskKey: key, - StartTS: startTS, - EncodePlan: GetEncodedPlan, - TblStats: stmtCtx.TableStats, - SessionBindings: handle.GetAllBindRecord(), - SessionVars: sctx.GetSessionVars(), - ExecStmts: []ast.StmtNode{stmtNode}, - Analyze: false, - IsCapture: true, - IsContinuesCapture: isContinuesCapture, - } - return domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) -} - // needLowerPriority checks whether it's needed to lower the execution priority // of a query. // If the estimated output row count of any operator in the physical plan tree diff --git a/executor/executor_test.go b/executor/executor_test.go index 122fbdbe7dd2f..858a4cc9372f9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -178,12 +178,26 @@ func TestPlanReplayer(t *testing.T) { } func TestPlanReplayerCapture(t *testing.T) { - store := testkit.CreateMockStore(t) + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("plan replayer capture '123' '123';") tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123")) tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists") + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("create table t(id int)") + tk.MustExec("prepare stmt from 'update t set id = ? where id = ? + 1';") + tk.MustExec("SET @number = 5;") + tk.MustExec("execute stmt using @number,@number") + _, sqlDigest := tk.Session().GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest() + tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;") + tk.MustExec(fmt.Sprintf("plan replayer capture '%v' '%v'", sqlDigest.String(), planDigest.String())) + err := dom.GetPlanReplayerHandle().CollectPlanReplayerTask() + require.NoError(t, err) + tk.MustExec("execute stmt using @number,@number") + task := dom.GetPlanReplayerHandle().DrainTask() + require.NotNil(t, task) } func TestPlanReplayerContinuesCapture(t *testing.T) { diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index fae6273b3bd5e..ff102e20820b2 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -119,6 +119,11 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error { zap.Error(err)) return err } + err = domain.GetDomain(e.ctx).GetPlanReplayerHandle().CollectPlanReplayerTask() + if err != nil { + logutil.BgLogger().Warn("collect task failed", zap.Error(err)) + } + logutil.BgLogger().Info("collect plan replayer task success") e.endFlag = true return nil } diff --git a/server/plan_replayer.go b/server/plan_replayer.go index beb202638d1fd..64629c6ee0070 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -115,7 +115,7 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req return } if handler.downloadedFilename == "plan_replayer" { - content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler) + content, err = handlePlanReplayerCaptureFile(content, path, handler) if err != nil { writeError(w, err) return @@ -219,8 +219,8 @@ func isExists(path string) (bool, error) { return true, nil } -func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) { - if !strings.Contains(handler.filePath, "continues_replayer") { +func handlePlanReplayerCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) { + if !strings.Contains(handler.filePath, "capture_replayer") { return content, nil } b := bytes.NewReader(content) @@ -331,7 +331,7 @@ func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) if err != nil { return "", err } - newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4]) + newPath := strings.Replace(path, "capture_replayer", "copy_capture_replayer", 1) zf, err := os.Create(newPath) if err != nil { return "", err diff --git a/util/replayer/replayer.go b/util/replayer/replayer.go index f89d26ec97717..39287ada70194 100644 --- a/util/replayer/replayer.go +++ b/util/replayer/replayer.go @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct { } // GeneratePlanReplayerFile generates plan replayer file -func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) { +func GeneratePlanReplayerFile(isCapture bool) (*os.File, string, error) { path := GetPlanReplayerDirName() err := os.MkdirAll(path, os.ModePerm) if err != nil { return nil, "", errors.AddStack(err) } - fileName, err := generatePlanReplayerFileName(isContinues) + fileName, err := generatePlanReplayerFileName(isCapture) if err != nil { return nil, "", errors.AddStack(err) } @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) { return zf, fileName, err } -func generatePlanReplayerFileName(isContinues bool) (string, error) { +func generatePlanReplayerFileName(isCapture bool) (string, error) { // Generate key and create zip file time := time.Now().UnixNano() b := make([]byte, 16) @@ -60,8 +60,8 @@ func generatePlanReplayerFileName(isContinues bool) (string, error) { return "", err } key := base64.URLEncoding.EncodeToString(b) - if isContinues { - return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil + if isCapture { + return fmt.Sprintf("capture_replayer_%v_%v.zip", key, time), nil } return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil }