Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed Dec 30, 2022
1 parent f029d65 commit 3f1470d
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 99 deletions.
92 changes: 92 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
Expand Down Expand Up @@ -1953,3 +1967,81 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.
}
return r
}

// only allow select/delete/update/insert/execute stmt captured by continues capture
func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool {
switch stmtNode.(type) {
case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt:
return true
default:
return false
}
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}
5 changes: 0 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,11 +1725,6 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
// `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts
// Please notice that in RC isolation, the above two ts are the same
func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) {
defer func() {
if err == nil {
b.ctx.GetSessionVars().StmtCtx.StmtSnapshotTS = ts
}
}()
if b.forDataReaderBuilder {
return b.dataReaderTS, nil
}
Expand Down
1 change: 0 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else {
sc.UseDynamicPruneMode = false
}
sc.StmtSnapshotTS = 0

sc.StatsLoad.Timeout = 0
sc.StatsLoad.NeededItems = nil
Expand Down
90 changes: 0 additions & 90 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/logutil/consistency"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -2373,17 +2372,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
se: se,
}, err
}
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := s.GetStmtNode()
startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, startTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, startTS)
}
}

err = finishStmt(ctx, se, err, s)
if se.hasQuerySpecial() {
Expand All @@ -2398,84 +2386,6 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
return nil, err
}

// only allow select/delete/update/insert/execute stmt captured by continues capture
func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool {
switch stmtNode.(type) {
case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt:
return true
default:
return false
}
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: executor.GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// ExecStmtVarKeyType is a dummy type to avoid naming collision in context.
type ExecStmtVarKeyType int

Expand Down
3 changes: 0 additions & 3 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ type StatementContext struct {
TableStats map[int64]interface{}
// useChunkAlloc indicates whether statement use chunk alloc
useChunkAlloc bool

// StmtSnapshotTS indicates the snapshot ts for the stmt
StmtSnapshotTS uint64
}

// StmtHints are SessionVars related sql hints.
Expand Down

0 comments on commit 3f1470d

Please sign in to comment.