Skip to content

Commit

Permalink
*: print an expensive log when a query exceeds time threshold (#10350)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and tiancaiamao committed Jun 5, 2019
1 parent 54e4894 commit f67352d
Show file tree
Hide file tree
Showing 25 changed files with 447 additions and 214 deletions.
46 changes: 29 additions & 17 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand All @@ -54,23 +55,24 @@ import (
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
statsUpdating sync2.AtomicInt32
ddl ddl.DDL
info *InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries
store kv.Storage
infoHandle *infoschema.Handle
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
statsUpdating sync2.AtomicInt32
ddl ddl.DDL
info *InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
}

// loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used
Expand Down Expand Up @@ -1005,6 +1007,16 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
}
}

// ExpensiveQueryHandle returns the expensive query handle.
func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

// InitExpensiveQueryHandle init the expensive query handler.
func (do *Domain) InitExpensiveQueryHandle() {
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
}

const privilegeKey = "/tidb/privilege"

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
Expand Down
35 changes: 8 additions & 27 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ type ExecStmt struct {
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Plan plannercore.Plan
// Expensive represents whether this query is an expensive one.
Expensive bool
// LowerPriority represents whether to lower the execution priority of a query.
LowerPriority bool
// Cacheable represents whether the physical plan can be cached.
Cacheable bool
// Text represents the origin query text.
Expand Down Expand Up @@ -203,7 +203,7 @@ func (a *ExecStmt) RebuildPlan() (int64, error) {
// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
a.StartTime = time.Now()
sctx := a.Ctx
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.Expensive:
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
}
}
Expand All @@ -554,6 +554,9 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
}
a.isPreparedStmt = true
a.Plan = executorExec.plan
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.isSelectForUpdate
Expand Down Expand Up @@ -609,7 +612,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
}
execDetail := sessVars.StmtCtx.GetExecDetails()
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := a.getStatsInfo()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
Expand Down Expand Up @@ -642,28 +645,6 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
}
}

func (a *ExecStmt) getStatsInfo() map[string]uint64 {
var physicalPlan plannercore.PhysicalPlan
switch p := a.Plan.(type) {
case *plannercore.Insert:
physicalPlan = p.SelectPlan
case *plannercore.Update:
physicalPlan = p.SelectPlan
case *plannercore.Delete:
physicalPlan = p.SelectPlan
case plannercore.PhysicalPlan:
physicalPlan = p
}

if physicalPlan == nil {
return nil
}

statsInfos := make(map[string]uint64)
statsInfos = plannercore.CollectPlanStatsVersion(physicalPlan, statsInfos)
return statsInfos
}

// IsPointGetWithPKOrUniqueKeyByAutoCommit returns true when meets following conditions:
// 1. ctx is auto commit tagged
// 2. txn is not valid
Expand Down
58 changes: 23 additions & 35 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor

import (
"context"
"fmt"
"strings"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -82,64 +81,53 @@ func (c *Compiler) compile(ctx context.Context, stmtNode ast.StmtNode, skipBind
}

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
isExpensive := logExpensiveQuery(stmtNode, finalPlan)

lowerPriority := needLowerPriority(finalPlan)
return &ExecStmt{
InfoSchema: infoSchema,
Plan: finalPlan,
Expensive: isExpensive,
Cacheable: plannercore.Cacheable(stmtNode),
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
InfoSchema: infoSchema,
Plan: finalPlan,
LowerPriority: lowerPriority,
Cacheable: plannercore.Cacheable(stmtNode),
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
}, nil
}

func logExpensiveQuery(stmtNode ast.StmtNode, finalPlan plannercore.Plan) (expensive bool) {
expensive = isExpensiveQuery(finalPlan)
if !expensive {
return
}

const logSQLLen = 1024
sql := stmtNode.Text()
if len(sql) > logSQLLen {
sql = fmt.Sprintf("%s len(%d)", sql[:logSQLLen], len(sql))
}
logutil.Logger(context.Background()).Warn("EXPENSIVE_QUERY", zap.String("SQL", sql))
return
}

func isExpensiveQuery(p plannercore.Plan) bool {
// needLowerPriority checks whether it's needed to lower the execution priority
// of a query.
// If the estimated output row count of any operator in the physical plan tree
// is greater than the specific threshold, we'll set it to lowPriority when
// sending it to the coprocessor.
func needLowerPriority(p plannercore.Plan) bool {
switch x := p.(type) {
case plannercore.PhysicalPlan:
return isPhysicalPlanExpensive(x)
return isPhysicalPlanNeedLowerPriority(x)
case *plannercore.Execute:
return isExpensiveQuery(x.Plan)
return needLowerPriority(x.Plan)
case *plannercore.Insert:
if x.SelectPlan != nil {
return isPhysicalPlanExpensive(x.SelectPlan)
return isPhysicalPlanNeedLowerPriority(x.SelectPlan)
}
case *plannercore.Delete:
if x.SelectPlan != nil {
return isPhysicalPlanExpensive(x.SelectPlan)
return isPhysicalPlanNeedLowerPriority(x.SelectPlan)
}
case *plannercore.Update:
if x.SelectPlan != nil {
return isPhysicalPlanExpensive(x.SelectPlan)
return isPhysicalPlanNeedLowerPriority(x.SelectPlan)
}
}
return false
}

func isPhysicalPlanExpensive(p plannercore.PhysicalPlan) bool {
expensiveRowThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if int64(p.StatsCount()) > expensiveRowThreshold {
func isPhysicalPlanNeedLowerPriority(p plannercore.PhysicalPlan) bool {
expensiveThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if int64(p.StatsCount()) > expensiveThreshold {
return true
}

for _, child := range p.Children() {
if isPhysicalPlanExpensive(child) {
if isPhysicalPlanNeedLowerPriority(child) {
return true
}
}
Expand Down
14 changes: 7 additions & 7 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,25 @@ type testExecSuite struct {

// mockSessionManager is a mocked session manager which is used for test.
type mockSessionManager struct {
PS []util.ProcessInfo
PS []*util.ProcessInfo
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo {
ret := make(map[uint64]util.ProcessInfo)
func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
}
return ret
}

func (msm *mockSessionManager) GetProcessInfo(id uint64) (util.ProcessInfo, bool) {
func (msm *mockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
for _, item := range msm.PS {
if item.ID == id {
return item, true
}
}
return util.ProcessInfo{}, false
return &util.ProcessInfo{}, false
}

// Kill implements the SessionManager.Kill interface.
Expand All @@ -70,8 +70,8 @@ func (s *testExecSuite) TestShowProcessList(c *C) {
schema := buildSchema(names, ftypes)

// Compose a mocked session manager.
ps := make([]util.ProcessInfo, 0, 1)
pi := util.ProcessInfo{
ps := make([]*util.ProcessInfo, 0, 1)
pi := &util.ProcessInfo{
ID: 0,
User: "test",
Host: "127.0.0.1",
Expand Down
14 changes: 7 additions & 7 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import (

// mockSessionManager is a mocked session manager which is used for test.
type mockSessionManager1 struct {
PS []util.ProcessInfo
PS []*util.ProcessInfo
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager1) ShowProcessList() map[uint64]util.ProcessInfo {
ret := make(map[uint64]util.ProcessInfo)
func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
}
return ret
}

func (msm *mockSessionManager1) GetProcessInfo(id uint64) (util.ProcessInfo, bool) {
func (msm *mockSessionManager1) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
for _, item := range msm.PS {
if item.ID == id {
return item, true
}
}
return util.ProcessInfo{}, false
return &util.ProcessInfo{}, false
}

// Kill implements the SessionManager.Kill interface.
Expand All @@ -62,7 +62,7 @@ func (s *testSuite) TestExplainFor(c *C) {

tkRoot.MustQuery("select * from t1;")
tkRootProcess := tkRoot.Se.ShowProcess()
ps := []util.ProcessInfo{tkRootProcess}
ps := []*util.ProcessInfo{tkRootProcess}
tkRoot.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tkUser.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tkRoot.MustQuery(fmt.Sprintf("explain for connection %d", tkRootProcess.ID)).Check(testkit.Rows(
Expand All @@ -75,7 +75,7 @@ func (s *testSuite) TestExplainFor(c *C) {
c.Check(core.ErrNoSuchThread.Equal(err), IsTrue)

tkRootProcess.Plan = nil
ps = []util.ProcessInfo{tkRootProcess}
ps = []*util.ProcessInfo{tkRootProcess}
tkRoot.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tkRoot.MustExec(fmt.Sprintf("explain for connection %d", tkRootProcess.ID))
}
17 changes: 9 additions & 8 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,14 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
type ExecuteExec struct {
baseExecutor

is infoschema.InfoSchema
name string
usingVars []expression.Expression
id uint32
stmtExec Executor
stmt ast.StmtNode
plan plannercore.Plan
is infoschema.InfoSchema
name string
usingVars []expression.Expression
id uint32
stmtExec Executor
stmt ast.StmtNode
plan plannercore.Plan
lowerPriority bool
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -235,7 +236,7 @@ func (e *ExecuteExec) Build(b *executorBuilder) error {
}
e.stmtExec = stmtExec
CountStmtNode(e.stmt, e.ctx.GetSessionVars().InRestrictedSQL)
logExpensiveQuery(e.stmt, e.plan)
e.lowerPriority = needLowerPriority(e.plan)
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ func (s *testSuite2) TestSetVar(c *C) {
c.Assert(err, NotNil)
tk.MustExec("set global tidb_back_off_weight = 10")
tk.MustQuery("select @@global.tidb_back_off_weight;").Check(testkit.Rows("10"))

tk.MustExec("set @@tidb_expensive_query_time_threshold=70")
tk.MustQuery("select @@tidb_expensive_query_time_threshold;").Check(testkit.Rows("70"))
}

func (s *testSuite2) TestSetCharset(c *C) {
Expand Down
10 changes: 0 additions & 10 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,6 @@ func (s *testSuite2) TestShowDatabasesInfoSchemaFirst(c *C) {
tk.MustExec(`drop database BBBB`)
}

// mockSessionManager is a mocked session manager that wraps one session
// it returns only this session's current process info as processlist for test.
type mockSessionManager struct {
session.Session
}

// Kill implements the SessionManager.Kill interface.
func (msm *mockSessionManager) Kill(cid uint64, query bool) {
}

func (s *testSuite2) TestShowWarnings(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
Loading

0 comments on commit f67352d

Please sign in to comment.