Skip to content

Commit

Permalink
*: add succ filed to slow log and fix shallow copy problem whe… (#11421)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and winkyao committed Jul 25, 2019
1 parent 5a5d3d2 commit 7e2586e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 102 deletions.
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
144 changes: 50 additions & 94 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -58,7 +57,8 @@ var slowQueryCols = []columnInfo{
{variable.SlowLogCopWaitP90, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
Expand Down Expand Up @@ -137,24 +137,34 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return lineByte, err
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
return resByte, err
}
}
return lineByte, err
return resByte, err
}

type slowQueryTuple struct {
Expand All @@ -181,77 +191,41 @@ type slowQueryTuple struct {
p90WaitTime float64
maxWaitTime float64
memMax int64
succ bool
sql string
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
value = stringutil.Copy(value)
var err error
switch field {
case variable.SlowLogTimeStr:
t, err := ParseTime(value)
st.time, err = ParseTime(value)
if err != nil {
return err
break
}
if t.Location() != tz {
t = t.In(tz)
if st.time.Location() != tz {
st.time = st.time.In(tz)
}
st.time = t
case variable.SlowLogTxnStartTSStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.txnStartTs = num
st.txnStartTs, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogUserStr:
st.user = value
case variable.SlowLogConnIDStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.connID = num
st.connID, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogQueryTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.queryTime = num
st.queryTime, err = strconv.ParseFloat(value, 64)
case execdetails.ProcessTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.processTime = num
st.processTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.waitTime = num
st.waitTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.backOffTime = num
st.backOffTime, err = strconv.ParseFloat(value, 64)
case execdetails.RequestCountStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.requestCount = num
st.requestCount, err = strconv.ParseUint(value, 10, 64)
case execdetails.TotalKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.totalKeys = num
st.totalKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.processKeys = num
st.processKeys, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexIDsStr:
Expand All @@ -263,50 +237,27 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string)
case variable.SlowLogStatsInfoStr:
st.statsInfo = value
case variable.SlowLogCopProcAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgProcessTime = num
st.avgProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90ProcessTime = num
st.p90ProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxProcessTime = num
st.maxProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgWaitTime = num
st.avgWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90WaitTime = num
st.p90WaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxWaitTime = num
st.maxWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogMemMax:
num, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.memMax = num
st.memMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogSucc:
st.succ, err = strconv.ParseBool(value)
case variable.SlowLogQuerySQLStr:
st.sql = value
}
if err != nil {
return errors.Wrap(err, "parse slow log failed `"+field+"` error")
}
return nil
}

Expand Down Expand Up @@ -339,6 +290,11 @@ func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record = append(record, types.NewFloat64Datum(st.p90WaitTime))
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewIntDatum(st.memMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(st.sql))
return record
}
Expand Down
16 changes: 15 additions & 1 deletion infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8
# Mem_max: 70724
# Succ: false
select * from t;`)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
Expand All @@ -53,7 +54,8 @@ select * from t;`)
}
recordString += str
}
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,0.05,0.6,0.8,70724,select * from t;"
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,0.05,0.6,0.8,70724,0,select * from t;"

c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
Expand All @@ -67,6 +69,7 @@ select a# from t;
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:1,t2:2
# Succ: false
select * from t;
`)
reader = bufio.NewReader(slowLog)
Expand Down Expand Up @@ -110,6 +113,17 @@ select * from t;
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// Add parse error check.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
# Succ: abc
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "parse slow log failed `Succ` error: strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down
24 changes: 21 additions & 3 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,16 +502,34 @@ func (s *testSuite) TestSlowQuery(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8
# Mem_max: 70724
# Succ: true
select * from t_slim;`))
c.Assert(f.Close(), IsNil)
c.Assert(f.Sync(), IsNil)
c.Assert(err, IsNil)

tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", slowLogFileName))
tk.MustExec("set time_zone = '+08:00';")
re := tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|",
"2019-02-12 19:33:56.571953|406315658548871171|[email protected]|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|select * from t_slim;"))
"2019-02-12 19:33:56.571953|406315658548871171|[email protected]|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|1|select * from t_slim;"))
tk.MustExec("set time_zone = '+00:00';")
re = tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|[email protected]|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|select * from t_slim;"))
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|[email protected]|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|1|select * from t_slim;"))

// Test for long query.
_, err = f.Write([]byte(`
# Time: 2019-02-13T19:33:56.571953+08:00
`))
c.Assert(err, IsNil)
sql := "select * from "
for len(sql) < 5000 {
sql += "abcdefghijklmnopqrstuvwxyz_1234567890_qwertyuiopasdfghjklzxcvbnm"
}
sql += ";"
_, err = f.Write([]byte(sql))
c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
re = tk.MustQuery("select query from information_schema.slow_query order by time desc limit 1")
rows := re.Rows()
c.Assert(rows[0][0], Equals, sql)
}
6 changes: 5 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,8 @@ const (
SlowLogCopWaitMax = "Cop_wait_max"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -852,9 +854,10 @@ const (
// # Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
// # Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
// # Memory_max: 4096
// # Succ: true
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, sql string) string {
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, succ bool, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -912,6 +915,7 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
if memMax > 0 {
buf.WriteString(SlowLogPrefixStr + SlowLogMemMax + SlowLogSpaceMarkStr + strconv.FormatInt(memMax, 10) + "\n")
}
buf.WriteString(SlowLogPrefixStr + SlowLogSucc + SlowLogSpaceMarkStr + strconv.FormatBool(succ) + "\n")
if len(sql) == 0 {
sql = ";"
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03
# Mem_max: 2333
# Succ: true
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, true, sql)
c.Assert(logString, Equals, resultString)
}

0 comments on commit 7e2586e

Please sign in to comment.