Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: support plan_replayer_task system table #39019

Merged
merged 16 commits into from
Nov 11, 2022
3 changes: 3 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//telemetry",
"//types",
"//util",
"//util/chunk",
"//util/dbterror",
"//util/domainutil",
"//util/engine",
Expand Down Expand Up @@ -86,6 +87,7 @@ go_test(
"domain_utils_test.go",
"domainctx_test.go",
"main_test.go",
"plan_replayer_handle_test.go",
"plan_replayer_test.go",
"schema_checker_test.go",
"schema_validator_test.go",
Expand All @@ -109,6 +111,7 @@ go_test(
"//session",
"//sessionctx/variable",
"//store/mockstore",
"//testkit",
"//testkit/testsetup",
"//util",
"//util/mock",
Expand Down
40 changes: 37 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,12 +1533,46 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
sctx: ctx,
}
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.sctxMu.sctx = ctx
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
}

var planReplayerHandleLease = 10 * time.Second

// DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test
func DisablePlanReplayerBackgroundJob4Test() {
planReplayerHandleLease = 0
}

// StartPlanReplayerHandle start plan replayer handle job
func (do *Domain) StartPlanReplayerHandle() {
if planReplayerHandleLease < 1 {
return
}
do.wg.Add(1)
go func() {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
tikcer := time.NewTicker(planReplayerHandleLease)
defer func() {
tikcer.Stop()
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case <-tikcer.C:
err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background())
if err != nil {
logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err))
}
}
}
}()
}

// GetPlanReplayerHandle returns plan replayer handle
func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle {
return do.planReplayerHandle
Expand Down
144 changes: 133 additions & 11 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand Down Expand Up @@ -115,16 +121,23 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
}

type planReplayerHandle struct {
sync.Mutex
sctx sessionctx.Context
sctxMu struct {
sync.Mutex
sctx sessionctx.Context
}

taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
Expand Down Expand Up @@ -154,9 +167,9 @@ func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, recor
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
Expand All @@ -167,9 +180,9 @@ func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx con
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
Expand All @@ -179,10 +192,119 @@ func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx c
}
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := h.checkUnHandledReplayerTask(ctx1, key)
if err != nil {
return err
}
if unhandled {
tasks = append(tasks, key)
}
}
h.setupTasks(tasks)
return nil
}

// GetTasks get all tasks
func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
for taskKey := range h.taskMu.tasks {
tasks = append(tasks, taskKey)
}
return tasks
}

func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
}
h.taskMu.Lock()
defer h.taskMu.Unlock()
h.taskMu.tasks = r
}

func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
return nil, err
}
if rs == nil {
return nil, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return nil, errors.Trace(err)
}
allKeys := make([]PlanReplayerTaskKey, 0, len(rows))
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
sqlDigest: sqlDigest,
planDigest: planDigest,
})
}
return allKeys, nil
}

func (h *planReplayerHandle) checkUnHandledReplayerTask(ctx context.Context, task PlanReplayerTaskKey) (bool, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
if err != nil {
return false, err
}
if rs == nil {
return true, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return false, errors.Trace(err)
}
if len(rows) > 0 {
return false, nil
}
return true, nil
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
OriginSQL string
Token string
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
sqlDigest string
planDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}
64 changes: 64 additions & 0 deletions domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package domain_test

import (
"context"
"testing"

"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestPlanReplayerHandleCollectTask(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
prHandle := dom.GetPlanReplayerHandle()

// assert 1 task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
err := prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

// assert no task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 0)

// assert 1 unhandled task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');")
tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values ('123','123','123','123')")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

// assert 2 unhandled task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');")
tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, fail_reason, instance) values ('123','123','123','123')")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 2)
}
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 38
result := 39
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
18 changes: 3 additions & 15 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func generatePlanReplayerFileName() (string, error) {
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
task := &PlanReplayerDumpTask{
task := &domain.PlanReplayerDumpTask{
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
Expand All @@ -242,18 +242,6 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
return nil
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}

// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
Expand Down Expand Up @@ -284,7 +272,7 @@ type PlanReplayerDumpTask struct {
|-....
*/
func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
task *PlanReplayerDumpTask) (err error) {
task *domain.PlanReplayerDumpTask) (err error) {
zf := task.Zf
fileName := task.FileName
sessionVars := task.SessionVars
Expand Down Expand Up @@ -373,7 +361,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func generateRecords(task *PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
func generateRecords(task *domain.PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
records := make([]domain.PlanReplayerStatusRecord, 0)
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
Expand Down
Loading