Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: support plan_replayer_status system table #38957

Merged
merged 10 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Domain struct {
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon

Expand Down Expand Up @@ -1530,6 +1531,19 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}()
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
sctx: ctx,
}
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
}

// GetPlanReplayerHandle returns plan replayer handle
func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle {
return do.planReplayerHandle
}

// DumpFileGcCheckerLoop creates a goroutine that handles `exit` and `gc`.
func (do *Domain) DumpFileGcCheckerLoop() {
do.wg.Add(1)
Expand Down
98 changes: 95 additions & 3 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package domain

import (
"errors"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -24,17 +25,23 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// dumpFileGcChecker is used to gc dump file in circle
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
gcLease time.Duration
paths []string
planReplayerHandle *planReplayerHandle
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand All @@ -44,6 +51,10 @@ func GetPlanReplayerDirName() string {
return filepath.Join(tidbLogDir, "replayer")
}

func parseType(s string) string {
return strings.Split(s, "_")[0]
}

func parseTime(s string) (time.Time, error) {
startIdx := strings.LastIndex(s, "_")
if startIdx == -1 {
Expand All @@ -68,6 +79,10 @@ func (p *dumpFileGcChecker) gcDumpFiles(t time.Duration) {
}
}

func (p *dumpFileGcChecker) setupPlanReplayerHandle(handle *planReplayerHandle) {
p.planReplayerHandle = handle
}

func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
files, err := ioutil.ReadDir(path)
if err != nil {
Expand All @@ -84,13 +99,90 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName))
continue
}
isPlanReplayer := parseType(fileName) == "replayer"
if !createTime.After(gcTime) {
err := os.Remove(filepath.Join(path, f.Name()))
if err != nil {
logutil.BgLogger().Warn("[dumpFileGcChecker] remove file failed", zap.Error(err), zap.String("filename", fileName))
continue
}
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.planReplayerHandle != nil {
p.planReplayerHandle.deletePlanReplayerStatus(context.Background(), fileName)
}
}
}
}

type planReplayerHandle struct {
sync.Mutex
sctx sessionctx.Context
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
}
}

// InsertPlanReplayerStatus insert mysql.plan_replayer_status record
func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, records []PlanReplayerStatusRecord) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
var instance string
serverInfo, err := infosync.GetServerInfo()
if err != nil {
logutil.BgLogger().Error("failed to get server info", zap.Error(err))
instance = "unknown"
} else {
instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port)
}
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
h.insertExternalPlanReplayerErrorStatusRecord(ctx1, instance, record)
} else {
h.insertExternalPlanReplayerSuccessStatusRecord(ctx1, instance, record)
}
}
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
OriginSQL string
Token string
FailedReason string
}
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ func TestPlanReplayer(t *testing.T) {
tk.MustQuery("plan replayer dump explain select * from v1")
tk.MustQuery("plan replayer dump explain select * from v2")
require.True(t, len(tk.Session().GetSessionVars().LastPlanReplayerToken) > 0)

// clear the status table and assert
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustQuery("plan replayer dump explain select * from v2")
token := tk.Session().GetSessionVars().LastPlanReplayerToken
rows := tk.MustQuery(fmt.Sprintf("select * from mysql.plan_replayer_status where token = '%v'", token)).Rows()
require.Len(t, rows, 1)
}

func TestShow(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 37
result := 38
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
20 changes: 20 additions & 0 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
records := generateRecords(task)
defer func() {
err = zw.Close()
if err != nil {
Expand All @@ -298,7 +299,12 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
for i, record := range records {
record.FailedReason = err.Error()
records[i] = record
}
}
domain.GetDomain(sctx).GetPlanReplayerHandle().InsertPlanReplayerStatus(ctx, records)
}()
// Dump config
if err = dumpConfig(zw); err != nil {
Expand Down Expand Up @@ -367,6 +373,20 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func generateRecords(task *PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
records := make([]domain.PlanReplayerStatusRecord, 0)
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
records = append(records, domain.PlanReplayerStatusRecord{
OriginSQL: execStmt.Text(),
Token: task.FileName,
Internal: false,
})
}
}
return records
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create(configFile)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,16 @@ const (
CreateMDLView = `CREATE OR REPLACE VIEW mysql.tidb_mdl_view as (
select JOB_ID, DB_NAME, TABLE_NAME, QUERY, SESSION_ID, TxnStart, TIDB_DECODE_SQL_DIGESTS(ALL_SQL_DIGESTS, 4096) AS SQL_DIGESTS from information_schema.ddl_jobs, information_schema.CLUSTER_TIDB_TRX, information_schema.CLUSTER_PROCESSLIST where ddl_jobs.STATE = 'running' and find_in_set(ddl_jobs.table_id, CLUSTER_TIDB_TRX.RELATED_TABLE_IDS) and CLUSTER_TIDB_TRX.SESSION_ID=CLUSTER_PROCESSLIST.ID
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
CreatePlanReplayerStatusTable = `CREATE TABLE IF NOT EXISTS mysql.plan_replayer_status (
sql_digest VARCHAR(128),
plan_digest VARCHAR(128),
origin_sql TEXT,
token VARCHAR(128),
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
fail_reason TEXT,
instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the plan replayer job');`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -644,11 +654,13 @@ const (
version99 = 99
// version100 converts server-memory-quota to a sysvar
version100 = 100
// version101 add mysql.plan_replayer_status table
version101 = 101
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version100
var currentBootstrapVersion int64 = version101

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -753,6 +765,7 @@ var (
upgradeToVer97,
upgradeToVer98,
upgradeToVer100,
upgradeToVer101,
}
)

Expand Down Expand Up @@ -1987,6 +2000,13 @@ func upgradeToVer98(s Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN IF NOT EXISTS `Token_issuer` varchar(255)")
}

func upgradeToVer101(s Session, ver int64) {
if ver >= version101 {
return
}
doReentrantDDL(s, CreatePlanReplayerStatusTable)
}

func upgradeToVer99Before(s Session, ver int64) bool {
if ver >= version99 {
return false
Expand Down Expand Up @@ -2122,6 +2142,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateAdvisoryLocks)
// Create mdl view.
mustExecute(s, CreateMDLView)
// Create plan_replayer_status table
mustExecute(s, CreatePlanReplayerStatusTable)
}

// inTestSuite checks if we are bootstrapping in the context of tests.
Expand Down
20 changes: 16 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2895,7 +2895,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota)
ses, err := createSessions(store, 7)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2969,21 +2969,33 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
}()
}

// setup dumpFileGcChecker
dom.SetupPlanReplayerHandle(ses[6])
dom.DumpFileGcCheckerLoop()

// A sub context for update table stats, and other contexts for concurrent stats loading.
cnt := 1 + concurrency
syncStatsCtxs, err := createSessions(store, cnt)
if err != nil {
return nil, err
}
subCtxs := make([]sessionctx.Context, cnt)
for i := 0; i < cnt; i++ {
subCtxs[i] = sessionctx.Context(ses[6+i])
subCtxs[i] = sessionctx.Context(syncStatsCtxs[i])
}
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
return nil, err
}

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = ses[7+concurrency+i]
subCtxs2[i] = analyzeCtxs[i]
}
dom.SetupAnalyzeExec(subCtxs2)
dom.DumpFileGcCheckerLoop()
dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey)

if raw, ok := store.(kv.EtcdBackend); ok {
Expand Down