diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index f7ef9baba6907..4b575f4dc63e0 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -42,7 +42,9 @@ go_library( "//privilege/privileges", "//sessionctx", "//sessionctx/sessionstates", + "//sessionctx/stmtctx", "//sessionctx/variable", + "//statistics", "//statistics/handle", "//telemetry", "//types", diff --git a/domain/domain.go b/domain/domain.go index 1016d5ba9b5cb..b900cf3eb8d3a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1532,11 +1532,17 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { } // SetupPlanReplayerHandle setup plan replayer handle -func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) { - do.planReplayerHandle = &planReplayerHandle{ - planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{ - sctx: ctx, - }, +func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + do.planReplayerHandle = &planReplayerHandle{} + do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{ + ctx: ctx, + sctx: collectorSctx, + } + do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{ + ctx: ctx, + sctx: dumperSctx, + taskCH: make(chan *PlanReplayerDumpTask, 16), } } @@ -1557,27 +1563,42 @@ func (do *Domain) StartPlanReplayerHandle() { if planReplayerHandleLease < 1 { return } - do.wg.Add(1) + do.wg.Add(2) go func() { tikcer := time.NewTicker(planReplayerHandleLease) defer func() { tikcer.Stop() do.wg.Done() - logutil.BgLogger().Info("PlanReplayerHandle exited.") - util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false) + logutil.BgLogger().Info("PlanReplayerTaskCollectHandle exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerTaskCollectHandle", nil, false) }() for { select { case <-do.exit: return case <-tikcer.C: - err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background()) + err := do.planReplayerHandle.CollectPlanReplayerTask() if err != nil { logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err)) } } } }() + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false) + }() + for { + select { + case <-do.exit: + return + case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH: + do.planReplayerHandle.HandlePlanReplayerDumpTask(task) + } + } + }() } // GetPlanReplayerHandle returns plan replayer handle diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index efc2e8ad21429..faab592950c64 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -16,8 +16,10 @@ package domain import ( "context" + "encoding/base64" "fmt" "io/ioutil" + "math/rand" "os" "path/filepath" "strconv" @@ -33,7 +35,9 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -122,6 +126,16 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { type planReplayerHandle struct { *planReplayerTaskCollectorHandle + *planReplayerTaskDumpHandle +} + +// HandlePlanReplayerDumpTask handle dump task +func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool { + success := h.dumpPlanReplayerDumpTask(task) + if success { + h.removeTask(task.PlanReplayerTaskKey) + } + return success } type planReplayerTaskCollectorHandle struct { @@ -129,6 +143,7 @@ type planReplayerTaskCollectorHandle struct { sync.RWMutex tasks map[PlanReplayerTaskKey]struct{} } + ctx context.Context sctx sessionctx.Context } @@ -153,32 +168,30 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port) } for _, record := range records { - if !record.Internal { - if len(record.FailedReason) > 0 { - insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record) - } else { - insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record) - } + if len(record.FailedReason) > 0 { + insertPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record) + } else { + insertPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record) } } } -func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { +func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { exec := sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( - "insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')", - record.OriginSQL, record.FailedReason, instance)) + "insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')", + record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance)) if err != nil { logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed", zap.Error(err)) } } -func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { +func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) { exec := sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( - "insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')", - record.OriginSQL, record.Token, instance)) + "insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')", + record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance)) if err != nil { logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed", zap.Error(err)) @@ -186,15 +199,14 @@ func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx ses } // CollectPlanReplayerTask collects all unhandled plan replayer task -func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error { - ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) - allKeys, err := h.collectAllPlanReplayerTask(ctx1) +func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error { + allKeys, err := h.collectAllPlanReplayerTask(h.ctx) if err != nil { return err } tasks := make([]PlanReplayerTaskKey, 0) for _, key := range allKeys { - unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key) + unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key) if err != nil { return err } @@ -227,6 +239,12 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey h.taskMu.tasks = r } +func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) { + h.taskMu.Lock() + defer h.taskMu.Unlock() + delete(h.taskMu.tasks, taskKey) +} + func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) { exec := h.sctx.(sqlexec.SQLExecutor) rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task") @@ -245,16 +263,96 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context for _, row := range rows { sqlDigest, planDigest := row.GetString(0), row.GetString(1) allKeys = append(allKeys, PlanReplayerTaskKey{ - sqlDigest: sqlDigest, - planDigest: planDigest, + SQLDigest: sqlDigest, + PlanDigest: planDigest, }) } return allKeys, nil } +type planReplayerTaskDumpHandle struct { + ctx context.Context + sctx sessionctx.Context + taskCH chan *PlanReplayerDumpTask +} + +// DrainTask drain a task for unit test +func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { + return <-h.taskCH +} + +// HandlePlanReplayerDumpTask handled the task +func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) { + taskKey := task.PlanReplayerTaskKey + unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey) + if err != nil { + logutil.BgLogger().Warn("check plan replayer capture task failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + // the task is processed, thus we directly skip it. + if !unhandled { + return true + } + + file, fileName, err := GeneratePlanReplayerFile() + if err != nil { + logutil.BgLogger().Warn("generate plan replayer capture task file failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return + } + task.Zf = file + task.FileName = fileName + task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) + jsStats := make(map[int64]*handle.JSONTable) + is := GetDomain(h.sctx).InfoSchema() + for tblID, stat := range task.TblStats { + tbl, ok := is.TableByID(tblID) + if !ok { + return false + } + schema, ok := is.SchemaByTable(tbl.Meta()) + if !ok { + return false + } + r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table)) + if err != nil { + logutil.BgLogger().Warn("generate plan replayer capture task json stats failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + jsStats[tblID] = r + } + err = DumpPlanReplayerInfo(h.ctx, h.sctx, task) + if err != nil { + logutil.BgLogger().Warn("dump plan replayer capture task result failed", + zap.String("sqlDigest", taskKey.SQLDigest), + zap.String("planDigest", taskKey.PlanDigest), + zap.Error(err)) + return false + } + return true +} + +// SendTask send dumpTask in background task handler +func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { + select { + case h.taskCH <- task: + default: + // TODO: add metrics here + // directly discard the task if the task channel is full in order not to block the query process + } +} + func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) { exec := sctx.(sqlexec.SQLExecutor) - rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest)) + rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest)) if err != nil { return false, err } @@ -274,7 +372,8 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta // PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status type PlanReplayerStatusRecord struct { - Internal bool + SQLDigest string + PlanDigest string OriginSQL string Token string FailedReason string @@ -282,18 +381,57 @@ type PlanReplayerStatusRecord struct { // PlanReplayerTaskKey indicates key of a plan replayer task type PlanReplayerTaskKey struct { - sqlDigest string - planDigest string + SQLDigest string + PlanDigest string } // PlanReplayerDumpTask wrap the params for plan replayer dump type PlanReplayerDumpTask struct { + PlanReplayerTaskKey + + // tmp variables stored during the query + EncodePlan func(*stmtctx.StatementContext, bool) (string, string) + TblStats map[int64]interface{} + + // variables used to dump the plan SessionBindings []*bindinfo.BindRecord EncodedPlan string - FileName string - Zf *os.File SessionVars *variable.SessionVars - TblStats map[int64]*handle.JSONTable + JSONTblStats map[int64]*handle.JSONTable ExecStmts []ast.StmtNode Analyze bool + + FileName string + Zf *os.File +} + +// GeneratePlanReplayerFile generates plan replayer file +func GeneratePlanReplayerFile() (*os.File, string, error) { + path := GetPlanReplayerDirName() + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return nil, "", errors.AddStack(err) + } + fileName, err := generatePlanReplayerFileName() + if err != nil { + return nil, "", errors.AddStack(err) + } + zf, err := os.Create(filepath.Join(path, fileName)) + if err != nil { + return nil, "", errors.AddStack(err) + } + return zf, fileName, err +} + +func generatePlanReplayerFileName() (string, error) { + // Generate key and create zip file + time := time.Now().UnixNano() + b := make([]byte, 16) + //nolint: gosec + _, err := rand.Read(b) + if err != nil { + return "", err + } + key := base64.URLEncoding.EncodeToString(b) + return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil } diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 93d0278a4ba3d..195dae7b4a0b1 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -210,7 +210,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, } // Dump stats - if err = dumpStats(zw, pairs, task.TblStats, do); err != nil { + if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil { return err } @@ -252,9 +252,10 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord { if len(task.ExecStmts) > 0 { for _, execStmt := range task.ExecStmts { records = append(records, PlanReplayerStatusRecord{ - OriginSQL: execStmt.Text(), - Token: task.FileName, - Internal: false, + SQLDigest: task.SQLDigest, + PlanDigest: task.PlanDigest, + OriginSQL: execStmt.Text(), + Token: task.FileName, }) } } diff --git a/domain/plan_replayer_handle_test.go b/domain/plan_replayer_handle_test.go index 2c25f56e15045..5a824ef4eeeb6 100644 --- a/domain/plan_replayer_handle_test.go +++ b/domain/plan_replayer_handle_test.go @@ -15,7 +15,7 @@ package domain_test import ( - "context" + "fmt" "testing" "github.com/pingcap/tidb/testkit" @@ -31,14 +31,14 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("delete from mysql.plan_replayer_task") tk.MustExec("delete from mysql.plan_replayer_status") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") - err := prHandle.CollectPlanReplayerTask(context.Background()) + err := prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 1) // assert no task tk.MustExec("delete from mysql.plan_replayer_task") tk.MustExec("delete from mysql.plan_replayer_status") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 0) @@ -48,7 +48,7 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values ('123','123','123','123')") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 1) @@ -58,7 +58,44 @@ func TestPlanReplayerHandleCollectTask(t *testing.T) { tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, fail_reason, instance) values ('123','123','123','123')") - err = prHandle.CollectPlanReplayerTask(context.Background()) + err = prHandle.CollectPlanReplayerTask() require.NoError(t, err) require.Len(t, prHandle.GetTasks(), 2) } + +func TestPlanReplayerHandleDumpTask(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + prHandle := dom.GetPlanReplayerHandle() + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustQuery("select * from t;") + _, d := tk.Session().GetSessionVars().StmtCtx.SQLDigest() + _, pd := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest() + sqlDigest := d.String() + planDigest := pd.String() + + // register task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + tk.MustExec(fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%v','%v');", sqlDigest, planDigest)) + err := prHandle.CollectPlanReplayerTask() + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 1) + + tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;") + + // capture task and dump + tk.MustQuery("select * from t;") + task := prHandle.DrainTask() + require.NotNil(t, task) + success := prHandle.HandlePlanReplayerDumpTask(task) + require.True(t, success) + // assert memory task consumed + require.Len(t, prHandle.GetTasks(), 0) + + // assert collect task again and no more memory task + err = prHandle.CollectPlanReplayerTask() + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 0) +} diff --git a/executor/adapter.go b/executor/adapter.go index db9fbbaa929e0..3dd0e0ce0877e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1629,6 +1629,11 @@ func getPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) { return normalized, planDigest } +// GetEncodedPlan returned same as getEncodedPlan +func GetEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) { + return getEncodedPlan(stmtCtx, genHint) +} + // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) { var hintSet bool diff --git a/executor/compiler.go b/executor/compiler.go index 241b15874e1e2..4ddb6208a445c 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -21,7 +21,9 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -154,9 +156,42 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } + if c.Ctx.GetSessionVars().EnablePlanReplayerCapture { + checkPlanReplayerCaptureTask(c.Ctx, stmtNode) + } + return stmt, nil } +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { + tasks := domain.GetDomain(sctx).GetPlanReplayerHandle().GetTasks() + _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() + _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) + for _, task := range tasks { + if task.SQLDigest == sqlDigest.String() && task.PlanDigest == planDigest.String() { + sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode) + } + } +} + +func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode) { + stmtCtx := sctx.GetSessionVars().StmtCtx + handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) + dumpTask := &domain.PlanReplayerDumpTask{ + PlanReplayerTaskKey: domain.PlanReplayerTaskKey{ + SQLDigest: sqlDigest, + PlanDigest: planDigest, + }, + EncodePlan: GetEncodedPlan, + TblStats: stmtCtx.TableStats, + SessionBindings: handle.GetAllBindRecord(), + SessionVars: sctx.GetSessionVars(), + ExecStmts: []ast.StmtNode{stmtNode}, + Analyze: false, + } + domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) +} + // needLowerPriority checks whether it's needed to lower the execution priority // of a query. // If the estimated output row count of any operator in the physical plan tree diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index fec3de1867933..ea6ff6155bf44 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -18,14 +18,10 @@ import ( "archive/zip" "bytes" "context" - "crypto/rand" - "encoding/base64" "encoding/json" "fmt" "os" - "path/filepath" "strings" - "time" "github.com/BurntSushi/toml" "github.com/pingcap/errors" @@ -95,44 +91,13 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *PlanReplayerExec) createFile() error { var err error - e.DumpInfo.File, e.DumpInfo.FileName, err = GeneratePlanReplayerFile() + e.DumpInfo.File, e.DumpInfo.FileName, err = domain.GeneratePlanReplayerFile() if err != nil { return err } return nil } -// GeneratePlanReplayerFile generates plan replayer file -func GeneratePlanReplayerFile() (*os.File, string, error) { - path := domain.GetPlanReplayerDirName() - err := os.MkdirAll(path, os.ModePerm) - if err != nil { - return nil, "", errors.AddStack(err) - } - fileName, err := generatePlanReplayerFileName() - if err != nil { - return nil, "", errors.AddStack(err) - } - zf, err := os.Create(filepath.Join(path, fileName)) - if err != nil { - return nil, "", errors.AddStack(err) - } - return zf, fileName, err -} - -func generatePlanReplayerFileName() (string, error) { - // Generate key and create zip file - time := time.Now().UnixNano() - b := make([]byte, 16) - //nolint: gosec - _, err := rand.Read(b) - if err != nil { - return "", err - } - key := base64.URLEncoding.EncodeToString(b) - return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil -} - func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) { fileName := e.FileName zf := e.File diff --git a/planner/core/collect_column_stats_usage.go b/planner/core/collect_column_stats_usage.go index 49fffb149b85d..4a351e60a9018 100644 --- a/planner/core/collect_column_stats_usage.go +++ b/planner/core/collect_column_stats_usage.go @@ -17,7 +17,6 @@ package core import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/variable" ) const ( @@ -51,7 +50,7 @@ type columnStatsUsageCollector struct { visitedtbls map[int64]struct{} } -func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector { +func newColumnStatsUsageCollector(collectMode uint64, enabledPlanCapture bool) *columnStatsUsageCollector { collector := &columnStatsUsageCollector{ collectMode: collectMode, // Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning. @@ -64,7 +63,7 @@ func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector if collectMode&collectHistNeededColumns != 0 { collector.histNeededCols = make(map[model.TableItemID]struct{}) } - if variable.EnablePlanReplayerCapture.Load() { + if enabledPlanCapture { collector.collectVisitedTable = true collector.visitedtbls = map[int64]struct{}{} } @@ -300,7 +299,7 @@ func CollectColumnStatsUsage(lp LogicalPlan, predicate, histNeeded bool) ([]mode if histNeeded { mode |= collectHistNeededColumns } - collector := newColumnStatsUsageCollector(mode) + collector := newColumnStatsUsageCollector(mode, lp.SCtx().GetSessionVars().EnablePlanReplayerCapture) collector.collectFromPlan(lp) if collector.collectVisitedTable { recordTableRuntimeStats(lp.SCtx(), collector.visitedtbls) diff --git a/planner/core/plan_replayer_capture_test.go b/planner/core/plan_replayer_capture_test.go index 2e88f090bd784..6778cdba20bbf 100644 --- a/planner/core/plan_replayer_capture_test.go +++ b/planner/core/plan_replayer_capture_test.go @@ -35,7 +35,6 @@ func TestPlanReplayerCaptureRecordJsonStats(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t1(a int)") tk.MustExec("create table t2(a int)") - tk.MustExec("SET global tidb_enable_plan_replayer_capture = ON;") tk.MustExec("analyze table t1") tk.MustExec("analyze table t2") testcases := []struct { @@ -68,6 +67,7 @@ func getTableStats(sql string, t *testing.T, ctx sessionctx.Context, dom *domain err = core.Preprocess(context.Background(), ctx, stmt, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: dom.InfoSchema()})) require.NoError(t, err) sctx := core.MockContext() + sctx.GetSessionVars().EnablePlanReplayerCapture = true builder, _ := core.NewPlanBuilder().Init(sctx, dom.InfoSchema(), &hint.BlockHintProcessor{}) domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(dom.InfoSchema()) plan, err := builder.Build(context.TODO(), stmt) diff --git a/session/bootstrap.go b/session/bootstrap.go index 08bf0293db72c..57660af7fa808 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -447,6 +447,7 @@ const ( plan_digest VARCHAR(128) NOT NULL, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (sql_digest,plan_digest));` + // CreateStatsTableLocked stores the locked tables CreateStatsTableLocked = `CREATE TABLE IF NOT EXISTS mysql.stats_table_locked( table_id bigint(64) NOT NULL, diff --git a/session/session.go b/session/session.go index 4b220af34516a..1b254c879ec40 100644 --- a/session/session.go +++ b/session/session.go @@ -2878,7 +2878,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 8) + ses, err := createSessions(store, 9) if err != nil { return nil, err } @@ -2953,10 +2953,10 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } // setup plan replayer handle - dom.SetupPlanReplayerHandle(ses[6]) + dom.SetupPlanReplayerHandle(ses[6], ses[7]) dom.StartPlanReplayerHandle() // setup dumpFileGcChecker - dom.SetupDumpFileGCChecker(ses[7]) + dom.SetupDumpFileGCChecker(ses[8]) dom.DumpFileGcCheckerLoop() // A sub context for update table stats, and other contexts for concurrent stats loading. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b11b727079630..b8fbcf54848e1 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1305,6 +1305,9 @@ type SessionVars struct { // preuseChunkAlloc indicates whether pre statement use chunk alloc // like select @@last_sql_use_alloc preUseChunkAlloc bool + + // EnablePlanReplayerCapture indicates whether enabled plan replayer capture + EnablePlanReplayerCapture bool } // GetNewChunkWithCapacity Attempt to request memory from the chunk pool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a592e7a7e8831..d73885663d957 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1039,15 +1039,17 @@ var defaultSysVars = []*SysVar{ }, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { return fmt.Sprintf("%d", MemoryUsageAlarmKeepRecordNum.Load()), nil }}, - {Scope: ScopeGlobal, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(false), Type: TypeBool, - SetGlobal: func(ctx context.Context, s *SessionVars, val string) error { - EnablePlanReplayerCapture.Store(TiDBOptOn(val)) - return nil - }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { - return strconv.FormatBool(EnablePlanReplayerCapture.Load()), nil - }}, /* The system variables below have GLOBAL and SESSION scope */ + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(false), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.EnablePlanReplayerCapture = TiDBOptOn(val) + return nil + }, + GetSession: func(vars *SessionVars) (string, error) { + return strconv.FormatBool(vars.EnablePlanReplayerCapture), nil + }, + }, {Scope: ScopeGlobal | ScopeSession, Name: TiDBRowFormatVersion, Value: strconv.Itoa(DefTiDBRowFormatV1), Type: TypeUnsigned, MinValue: 1, MaxValue: 2, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLReorgRowFormat(TidbOptInt64(val, DefTiDBRowFormatV2)) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 80c9b41f4cc6e..3511775de08f1 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1148,9 +1148,8 @@ var ( // DefTiDBServerMemoryLimit indicates the default value of TiDBServerMemoryLimit(TotalMem * 80%). // It should be a const and shouldn't be modified after tidb is started. - DefTiDBServerMemoryLimit = serverMemoryLimitDefaultValue() - GOGCTunerThreshold = atomic.NewFloat64(DefTiDBGOGCTunerThreshold) - EnablePlanReplayerCapture = atomic.NewBool(DefTiDBEnablePlanReplayerCapture) + DefTiDBServerMemoryLimit = serverMemoryLimitDefaultValue() + GOGCTunerThreshold = atomic.NewFloat64(DefTiDBGOGCTunerThreshold) ) var (