Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: show more information about cop tasks in slow log #10165

Merged
merged 8 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,14 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
indexIDs = strings.Replace(fmt.Sprintf("%v", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
execDetail := sessVars.StmtCtx.GetExecDetails()
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := a.getStatsInfo()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
48 changes: 48 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package stmtctx

import (
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -95,6 +96,7 @@ type StatementContext struct {
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
allExecDetails []*execdetails.ExecDetails
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Expand Down Expand Up @@ -376,6 +378,8 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.touched = 0
sc.mu.message = ""
sc.mu.warnings = nil
sc.mu.execDetails = execdetails.ExecDetails{}
sc.mu.allExecDetails = make([]*execdetails.ExecDetails, 0, 4)
sc.mu.Unlock()
sc.TableIDs = sc.TableIDs[:0]
sc.IndexIDs = sc.IndexIDs[:0]
Expand All @@ -392,6 +396,7 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
sc.mu.allExecDetails = append(sc.mu.allExecDetails, details)
}
sc.mu.execDetails.CommitDetail = commitDetails
sc.mu.Unlock()
Expand Down Expand Up @@ -423,3 +428,46 @@ func (sc *StatementContext) ShouldIgnoreOverflowError() bool {
}
return false
}

// CopTasksDetails returns some useful information of cop-tasks during execution.
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
d := &CopTasksDetails{NumCopTasks: n}
if n == 0 {
return d
}
d.AvgProcessTime = sc.mu.execDetails.ProcessTime / time.Duration(n)
d.AvgWaitTime = sc.mu.execDetails.WaitTime / time.Duration(n)

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].ProcessTime < sc.mu.allExecDetails[j].ProcessTime
})
d.P90ProcessTime = sc.mu.allExecDetails[n*9/10].ProcessTime
d.MaxProcessTime = sc.mu.allExecDetails[n-1].ProcessTime
d.MaxProcessAddress = sc.mu.allExecDetails[n-1].CalleeAddress

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].WaitTime < sc.mu.allExecDetails[j].WaitTime
})
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress
return d
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int

AvgProcessTime time.Duration
P90ProcessTime time.Duration
MaxProcessAddress string
MaxProcessTime time.Duration

AvgWaitTime time.Duration
P90WaitTime time.Duration
MaxWaitAddress string
MaxWaitTime time.Duration
}
46 changes: 46 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtctx

import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

func TestCopTasksDetails(t *testing.T) {
ctx := new(StatementContext)
for i := 0; i < 100; i++ {
d := &execdetails.ExecDetails{
CalleeAddress: fmt.Sprintf("%v", i+1),
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
}
ctx.MergeExecDetails(d, nil)
}
c := ctx.CopTasksDetails()
if c.NumCopTasks != 100 ||
c.AvgProcessTime != time.Second*101/2 ||
c.P90ProcessTime != time.Second*91 ||
c.MaxProcessTime != time.Second*100 ||
c.MaxProcessAddress != "100" ||
c.AvgWaitTime != time.Millisecond*101/2 ||
c.P90WaitTime != time.Millisecond*91 ||
c.MaxWaitTime != time.Millisecond*100 ||
c.MaxWaitAddress != "100" {
t.Fatal(c)
}
}
19 changes: 18 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,12 @@ const (
SlowLogQuerySQLStr = "Query" // use for slow log table, slow log will not print this field name but print sql directly.
// SlowLogStatsInfoStr is plan stats info.
SlowLogStatsInfoStr = "Stats"
// SlowLogNumCopTasksStr is the number of cop-tasks.
SlowLogNumCopTasksStr = "Num_cop_tasks"
// SlowLogCopProcessStr includes some useful information about cop-tasks' process time.
SlowLogCopProcessStr = "Cop_process"
// SlowLogCopWaitStr includes some useful information about cop-tasks' wait time.
SlowLogCopWaitStr = "Cop_wait"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -912,8 +918,10 @@ const (
// # Is_internal: false
// # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
// # Stats: t1:1,t2:2
// # Cop_tasks:
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string, statsInfos map[string]uint64, sql string) string {
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogRowPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -957,6 +965,15 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
}
buf.WriteString("\n")
}
if copTasks != nil {
buf.WriteString(SlowLogRowPrefixStr + SlowLogNumCopTasksStr + SlowLogSpaceMarkStr + strconv.FormatInt(int64(copTasks.NumCopTasks), 10) + "\n")
buf.WriteString(SlowLogRowPrefixStr + SlowLogCopProcessStr + SlowLogSpaceMarkStr +
fmt.Sprintf("Avg_time: %v P90_time: %v Max_time: %v Max_addr: %v", copTasks.AvgProcessTime,
copTasks.P90ProcessTime, copTasks.MaxProcessTime, copTasks.MaxProcessAddress) + "\n")
buf.WriteString(SlowLogRowPrefixStr + SlowLogCopWaitStr + SlowLogSpaceMarkStr +
fmt.Sprintf("Avg_time: %v P90_time: %v Max_time: %v Max_Addr: %v", copTasks.AvgWaitTime,
copTasks.P90WaitTime, copTasks.MaxWaitTime, copTasks.MaxWaitAddress) + "\n")
}
if len(sql) == 0 {
sql = ";"
}
Expand Down
17 changes: 16 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
)
Expand Down Expand Up @@ -106,6 +107,17 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
}
statsInfos := make(map[string]uint64)
statsInfos["t1"] = 0
copTasks := &stmtctx.CopTasksDetails{
NumCopTasks: 10,
AvgProcessTime: time.Second,
P90ProcessTime: time.Second * 2,
MaxProcessAddress: "10.6.131.78",
MaxProcessTime: time.Second * 3,
AvgWaitTime: time.Millisecond * 10,
P90WaitTime: time.Millisecond * 20,
MaxWaitTime: time.Millisecond * 30,
MaxWaitAddress: "10.6.131.79",
}
resultString := `# Txn_start_ts: 406649736972468225
# User: [email protected]
# Conn_ID: 1
Expand All @@ -116,9 +128,12 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:pseudo
# Num_cop_tasks: 10
# Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
# Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, sql)
c.Assert(logString, Equals, resultString)
}