Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema/slow_query: fix token too long #10328

Merged
merged 18 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package infoschema
import (
"bufio"
"context"
"io"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -77,19 +78,24 @@ func parseSlowLogFile(tz *time.Location, filePath string) ([][]types.Datum, erro
logutil.Logger(context.Background()).Error("close slow log file failed.", zap.String("file", filePath), zap.Error(err))
}
}()

return ParseSlowLog(tz, bufio.NewScanner(file))
return ParseSlowLog(tz, bufio.NewReader(file))
}

// ParseSlowLog exports for testing.
// TODO: optimize for parse huge log-file.
func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, error) {
func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, error) {
var rows [][]types.Datum
startFlag := false
var st *slowQueryTuple
var err error
for scanner.Scan() {
line := scanner.Text()
for {
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
return rows, nil
}
return rows, err
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
Expand Down Expand Up @@ -127,10 +133,27 @@ func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, e
}
}
}
if err := scanner.Err(); err != nil {
return nil, errors.AddStack(err)
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
lineByte, isPrefix, err := reader.ReadLine()
if err != nil {
return lineByte, err
}
var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using MaxOfMaxAllowedPacket instead of MaxAllowedPacket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because MaxAllowedPacket may has been changed.

return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
}
}
return rows, nil
return lineByte, err
}

type slowQueryTuple struct {
Expand Down
34 changes: 28 additions & 6 deletions infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package infoschema_test
import (
"bufio"
"bytes"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
)

Expand All @@ -36,10 +38,10 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
select * from t;`)
scanner := bufio.NewScanner(slowLog)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
rows, err := infoschema.ParseSlowLog(loc, scanner)
rows, err := infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)
recordString := ""
Expand Down Expand Up @@ -67,8 +69,8 @@ select a# from t;
# Stats: t1:1,t2:2
select * from t;
`)
scanner = bufio.NewScanner(slowLog)
_, err = infoschema.ParseSlowLog(loc, scanner)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// test for time format compatibility.
Expand All @@ -78,8 +80,8 @@ select * from t;
# Time: 2019-04-24-19:41:21.716221 +0800
select * from t;
`)
scanner = bufio.NewScanner(slowLog)
rows, err = infoschema.ParseSlowLog(loc, scanner)
reader = bufio.NewReader(slowLog)
rows, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
c.Assert(len(rows) == 2, IsTrue)
t0Str, err := rows[0][0].ToString()
Expand All @@ -88,6 +90,26 @@ select * from t;
t1Str, err := rows[1][0].ToString()
c.Assert(err, IsNil)
c.Assert(t1Str, Equals, "2019-04-24 19:41:21.716221")

// test for bufio.Scanner: token too long.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
select * from t;
# Time: 2019-04-24-19:41:21.716221 +0800
`)
originValue := variable.MaxOfMaxAllowedPacket
variable.MaxOfMaxAllowedPacket = 65536
sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1))
slowLog.WriteString(sql)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536")

variable.MaxOfMaxAllowedPacket = originValue
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down
7 changes: 4 additions & 3 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ var (
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold uint32 = DefTiDBDDLSlowOprThreshold
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
DDLSlowOprThreshold uint32 = DefTiDBDDLSlowOprThreshold
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
MaxOfMaxAllowedPacket uint64 = 1073741824
)
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return checkUInt64SystemVar(name, value, 1, 1073741824, vars)
// See "https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet"
case MaxAllowedPacket:
return checkUInt64SystemVar(name, value, 1024, 1073741824, vars)
return checkUInt64SystemVar(name, value, 1024, MaxOfMaxAllowedPacket, vars)
case MaxConnections:
return checkUInt64SystemVar(name, value, 1, 100000, vars)
case MaxConnectErrors:
Expand Down