Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: support dump plan replayer capture task during query #39125

Merged
merged 25 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ go_library(
"//privilege/privileges",
"//sessionctx",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle",
"//telemetry",
"//types",
Expand Down
39 changes: 30 additions & 9 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,11 +1532,17 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{
sctx: ctx,
},
func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{
ctx: ctx,
sctx: collectorSctx,
}
do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
ctx: ctx,
sctx: dumperSctx,
taskCH: make(chan *PlanReplayerDumpTask, 16),
}
}

Expand All @@ -1557,27 +1563,42 @@ func (do *Domain) StartPlanReplayerHandle() {
if planReplayerHandleLease < 1 {
return
}
do.wg.Add(1)
do.wg.Add(2)
go func() {
tikcer := time.NewTicker(planReplayerHandleLease)
defer func() {
tikcer.Stop()
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false)
logutil.BgLogger().Info("PlanReplayerTaskCollectHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskCollectHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case <-tikcer.C:
err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background())
err := do.planReplayerHandle.CollectPlanReplayerTask()
if err != nil {
logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err))
}
}
}
}()
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH:
do.planReplayerHandle.HandlePlanReplayerDumpTask(task)
}
}
}()
}

// GetPlanReplayerHandle returns plan replayer handle
Expand Down
188 changes: 163 additions & 25 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package domain

import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
Expand All @@ -33,7 +35,9 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -122,13 +126,24 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// HandlePlanReplayerDumpTask handle dump task
func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool {
success := h.dumpPlanReplayerDumpTask(task)
if success {
h.removeTask(task.PlanReplayerTaskKey)
}
return success
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

Expand All @@ -153,48 +168,45 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco
instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port)
}
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
if len(record.FailedReason) > 0 {
insertPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
insertPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
}
}

func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
allKeys, err := h.collectAllPlanReplayerTask(h.ctx)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key)
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
return err
}
Expand Down Expand Up @@ -227,6 +239,12 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey
h.taskMu.tasks = r
}

func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) {
h.taskMu.Lock()
defer h.taskMu.Unlock()
delete(h.taskMu.tasks, taskKey)
}

func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
Expand All @@ -245,16 +263,96 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
sqlDigest: sqlDigest,
planDigest: planDigest,
SQLDigest: sqlDigest,
PlanDigest: planDigest,
})
}
return allKeys, nil
}

type planReplayerTaskDumpHandle struct {
ctx context.Context
sctx sessionctx.Context
taskCH chan *PlanReplayerDumpTask
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

// HandlePlanReplayerDumpTask handled the task
func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) {
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
// the task is processed, thus we directly skip it.
if !unhandled {
return true
}

file, fileName, err := GeneratePlanReplayerFile()
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(h.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
}
err = DumpPlanReplayerInfo(h.ctx, h.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
return true
}

// SendTask send dumpTask in background task handler
func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.taskCH <- task:
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
}
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest))
if err != nil {
return false, err
}
Expand All @@ -274,26 +372,66 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
SQLDigest string
PlanDigest string
OriginSQL string
Token string
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
sqlDigest string
planDigest string
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}

// variables used to dump the plan
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
JSONTblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool

FileName string
Zf *os.File
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err := rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}
9 changes: 5 additions & 4 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}

// Dump stats
if err = dumpStats(zw, pairs, task.TblStats, do); err != nil {
if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil {
return err
}

Expand Down Expand Up @@ -252,9 +252,10 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
records = append(records, PlanReplayerStatusRecord{
OriginSQL: execStmt.Text(),
Token: task.FileName,
Internal: false,
SQLDigest: task.SQLDigest,
PlanDigest: task.PlanDigest,
OriginSQL: execStmt.Text(),
Token: task.FileName,
})
}
}
Expand Down
Loading