Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: revise plan replayer process log #40126

Merged
merged 9 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ type planReplayerHandle struct {
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
return true
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
logutil.BgLogger().Warn("discard one plan replayer dump task",
zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest))
return false
}
}

Expand All @@ -209,9 +211,13 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-task] collect plan replayer task failed", zap.Error(err))
return err
}
if unhandled {
logutil.BgLogger().Debug("[plan-replayer-task] collect plan replayer task success",
zap.String("sql-digest", key.SQLDigest),
zap.String("plan-digest", key.PlanDigest))
tasks = append(tasks, key)
}
}
Expand Down Expand Up @@ -351,16 +357,36 @@ type planReplayerTaskDumpWorker struct {

func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
w.handleTask(task)
}
}

func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
sqlDigest := task.SQLDigest
planDigest := task.PlanDigest
check := true
occupy := true
handleTask := true
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
zap.Bool("check", check),
zap.Bool("occupy", occupy),
zap.Bool("handle", handleTask))
}()
if task.IsContinuesCapture {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
check = false
return
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
occupy = w.status.occupyRunningTaskKey(task)
if !occupy {
return
}
handleTask = w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}

// HandleTask handled task
Expand All @@ -373,7 +399,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
logutil.BgLogger().Warn("[plan-replayer-capture] check task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -386,7 +412,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -409,7 +435,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -421,7 +447,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand Down
52 changes: 43 additions & 9 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
PlanReplayerTaskMetaIsCapture = "isCapture"
// PlanReplayerTaskMetaIsContinues indicates whether this task is continues task
PlanReplayerTaskMetaIsContinues = "isContinues"
// PlanReplayerTaskMetaSQLDigest indicates the sql digest of this task
PlanReplayerTaskMetaSQLDigest = "sqlDigest"
// PlanReplayerTaskMetaPlanDigest indicates the plan digest of this task
PlanReplayerTaskMetaPlanDigest = "planDigest"
)

type tableNamePair struct {
Expand Down Expand Up @@ -180,25 +184,53 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
var records []PlanReplayerStatusRecord
sqls := make([]string, 0)
for _, execStmt := range task.ExecStmts {
sqls = append(sqls, execStmt.Text())
}
if task.IsCapture {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.String("sql-digest", task.SQLDigest),
zap.String("plan-digest", task.PlanDigest),
zap.Strings("sql", sqls),
zap.Bool("isContinues", task.IsContinuesCapture))
} else {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.Strings("sqls", sqls))
}
defer func() {
errMsg := ""
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
if task.IsCapture {
logutil.BgLogger().Info("[plan-replayer-dump] dump file failed",
zap.String("sql-digest", task.SQLDigest),
zap.String("plan-digest", task.PlanDigest),
zap.Strings("sql", sqls),
zap.Bool("isContinues", task.IsContinuesCapture))
} else {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.Strings("sqls", sqls))
}
errMsg = err.Error()
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
err1 := zw.Close()
if err1 != nil {
logutil.BgLogger().Error("[plan-replayer-dump] Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
errMsg = errMsg + "," + err1.Error()
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
err2 := zf.Close()
if err2 != nil {
logutil.BgLogger().Error("[plan-replayer-dump] Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
errMsg = errMsg + "," + err2.Error()
}
if len(errMsg) > 0 {
for i, record := range records {
record.FailedReason = err.Error()
record.FailedReason = errMsg
records[i] = record
}
}
insertPlanReplayerStatus(ctx, sctx, records)
}()

// Dump SQLMeta
if err = dumpSQLMeta(zw, task); err != nil {
return err
Expand Down Expand Up @@ -299,6 +331,8 @@ func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error {
varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10)
varMap[PlanReplayerTaskMetaIsCapture] = strconv.FormatBool(task.IsCapture)
varMap[PlanReplayerTaskMetaIsContinues] = strconv.FormatBool(task.IsContinuesCapture)
varMap[PlanReplayerTaskMetaSQLDigest] = task.SQLDigest
varMap[PlanReplayerTaskMetaPlanDigest] = task.PlanDigest
if err := toml.NewEncoder(cf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
Expand Down
47 changes: 31 additions & 16 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}

return stmt, nil
}

Expand All @@ -183,17 +180,25 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode
if handle == nil {
return
}
captured := false
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx)
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Int("tasks", len(tasks)),
zap.Bool("captured", captured))
}()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
Expand All @@ -215,16 +220,26 @@ func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.Stm
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
captured := false
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check continues capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Bool("captured", captured))
}()

existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
if captured {
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
startTS uint64, isContinuesCapture bool) bool {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
Expand All @@ -239,7 +254,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
return domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down
31 changes: 25 additions & 6 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -153,15 +155,32 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
sql := "delete from mysql.stats_meta_history where NOW() - create_time >= %?"
_, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
sql := "select count(*) from mysql.stats_meta_history where NOW() - create_time >= %?"
rs, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
if rs == nil {
return nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return errors.Trace(err)
}
count := rows[0].GetInt64(0)
if count > 0 {
sql = "delete from mysql.stats_meta_history where NOW() - create_time >= %?"
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}
return nil
}

func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {
Expand Down