Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*:Speed up parse slow-log when query slow_query #15371 (#19139) #20556

Merged
merged 2 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 164 additions & 91 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package executor
import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -64,7 +67,6 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte
}
e.initializeAsyncParsing(ctx, sctx)
}

rows, retrieved, err := e.dataForSlowLog(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -126,16 +128,8 @@ func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessi
close(e.parsedSlowLogCh)
return
}

reader := bufio.NewReader(e.files[0].file)
for e.fileIdx < len(e.files) {
rows, err := e.parseSlowLog(sctx, reader, 1024)
select {
case <-ctx.Done():
break
case e.parsedSlowLogCh <- parsedSlowLog{rows, err}:
}
}
e.parseSlowLog(ctx, sctx, reader, 64)
close(e.parsedSlowLogCh)
}

Expand All @@ -144,25 +138,28 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context) ([][]types.Datu
slowLog parsedSlowLog
ok bool
)
select {
case slowLog, ok = <-e.parsedSlowLogCh:
case <-ctx.Done():
return nil, false, ctx.Err()
}
if !ok {
// When e.parsedSlowLogCh is closed, the slow log data is retrieved.
return nil, true, nil
}

rows, err := slowLog.rows, slowLog.err
if err != nil {
return nil, false, err
}
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
rows, err := infoschema.AppendHostInfoToRows(rows)
return rows, false, err
for {
select {
case slowLog, ok = <-e.parsedSlowLogCh:
case <-ctx.Done():
return nil, false, ctx.Err()
}
if !ok {
return nil, true, nil
}
rows, err := slowLog.rows, slowLog.err
if err != nil {
return nil, false, err
}
if len(rows) == 0 {
continue
}
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
rows, err := infoschema.AppendHostInfoToRows(rows)
return rows, false, err
}
return rows, false, nil
}
return rows, false, nil
}

type slowLogChecker struct {
Expand All @@ -186,35 +183,144 @@ func (sc *slowLogChecker) isTimeValid(t types.Time) bool {
return true
}

// TODO: optimize for parse huge log-file.
func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) {
var rows [][]types.Datum
var st *slowQueryTuple
startFlag := false
tz := ctx.GetSessionVars().Location()
for {
if len(rows) >= maxRow {
return rows, nil
func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)
// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
e.fileLine++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
e.fileIdx++
e.fileLine = 0
if e.fileIdx >= len(e.files) {
return rows, nil
return resByte, err
}
}
return resByte, err
}

type offset struct {
offset int
length int
}

func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, num int) ([]string, error) {
var line string
log := make([]string, 0, num)
var err error
for i := 0; i < num; i++ {
for {
e.fileLine++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
e.fileIdx++
e.fileLine = 0
if e.fileIdx >= len(e.files) {
return log, nil
}
offset.length = len(log)
reader.Reset(e.files[e.fileIdx].file)
continue
}
reader.Reset(e.files[e.fileIdx].file)
continue
return log, err
}
return rows, err
line = string(hack.String(lineByte))
log = append(log, line)
if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
if strings.HasPrefix(line, "use") {
continue
}
break
}
}
}
return log, err
}

func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) {
var wg sync.WaitGroup
offset := offset{offset: 0, length: 0}
// To limit the num of go routine
ch := make(chan int, sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency)
defer close(ch)
for {
log, err := e.getBatchLog(reader, &offset, logNum)
if err != nil {
e.parsedSlowLogCh <- parsedSlowLog{nil, err}
break
}
start := offset
wg.Add(1)
ch <- 1
go func() {
defer wg.Done()
result, err := e.parseLog(sctx, log, start)
if err != nil {
e.parsedSlowLogCh <- parsedSlowLog{nil, err}
} else {
e.parsedSlowLogCh <- parsedSlowLog{result, err}
}
<-ch
}()
// Read the next file, offset = 0
if e.fileIdx >= len(e.files) {
break
}
offset.offset = e.fileLine
offset.length = 0
select {
case <-ctx.Done():
break
default:
}
}
wg.Wait()
}

func getLineIndex(offset offset, index int) int {
var fileLine int
if offset.length <= index {
fileLine = index - offset.length + 1
} else {
fileLine = offset.offset + index + 1
}
return fileLine
}

func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
}()
failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) {
if val.(bool) {
panic("panic test")
}
})
var st *slowQueryTuple
tz := ctx.GetSessionVars().Location()
startFlag := false
for index, line := range log {
fileLine := getLineIndex(offset, index)
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -224,17 +330,14 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
}
continue
}

if startFlag {
// Parse slow log field.
if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
line = line[len(variable.SlowLogRowPrefixStr):]
if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) {
st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):]
} else if strings.HasPrefix(line, variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr) {
// the user and hostname field has a special format, for example, # User@Host: root[root] @ localhost [127.0.0.1]
value := line[len(variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr):]
valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -249,7 +352,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -266,53 +369,22 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
// please see https://github.com/pingcap/tidb/issues/17846 for more details.
continue
}

// Get the sql string, and mark the start flag to false.
_, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker)
_, err := st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if e.checker.hasPrivilege(st.user) {
rows = append(rows, st.convertToDatumRow())
data = append(data, st.convertToDatumRow())
}
startFlag = false
} else {
startFlag = false
}
}
}
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return resByte, err
}
}
return resByte, err
return data, nil
}

type slowQueryTuple struct {
Expand Down Expand Up @@ -514,12 +586,13 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string,
st.preprocSubQueryTime, err = strconv.ParseFloat(value, 64)
}
if err != nil {
return valid, errors.Wrap(err, "Parse slow log at line "+strconv.FormatInt(int64(lineNum), 10)+" failed. Field: `"+field+"`, error")
return valid, fmt.Errorf("Parse slow log at line " + strconv.FormatInt(int64(lineNum), 10) + " failed. Field: `" + field + "`, error: " + err.Error())
}
return valid, err
}

func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
// Build the slow query result
record := make([]types.Datum, 0, 64)
record = append(record, types.NewTimeDatum(st.time))
record = append(record, types.NewUintDatum(st.txnStartTs))
Expand Down Expand Up @@ -775,6 +848,6 @@ func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) {
}

func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) {
e.parsedSlowLogCh = make(chan parsedSlowLog, 1)
e.parsedSlowLogCh = make(chan parsedSlowLog, 100)
go e.parseDataForSlowLog(ctx, sctx)
}
Loading