diff --git a/executor/slow_query.go b/executor/slow_query.go index 999bf309c32c8..21ae7b4cd3f46 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -16,15 +16,18 @@ package executor import ( "bufio" "context" + "fmt" "io" "os" "path/filepath" "sort" "strconv" "strings" + "sync" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -64,7 +67,6 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte } e.initializeAsyncParsing(ctx, sctx) } - rows, retrieved, err := e.dataForSlowLog(ctx) if err != nil { return nil, err @@ -126,16 +128,8 @@ func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessi close(e.parsedSlowLogCh) return } - reader := bufio.NewReader(e.files[0].file) - for e.fileIdx < len(e.files) { - rows, err := e.parseSlowLog(sctx, reader, 1024) - select { - case <-ctx.Done(): - break - case e.parsedSlowLogCh <- parsedSlowLog{rows, err}: - } - } + e.parseSlowLog(ctx, sctx, reader, 64) close(e.parsedSlowLogCh) } @@ -144,25 +138,28 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context) ([][]types.Datu slowLog parsedSlowLog ok bool ) - select { - case slowLog, ok = <-e.parsedSlowLogCh: - case <-ctx.Done(): - return nil, false, ctx.Err() - } - if !ok { - // When e.parsedSlowLogCh is closed, the slow log data is retrieved. - return nil, true, nil - } - - rows, err := slowLog.rows, slowLog.err - if err != nil { - return nil, false, err - } - if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) { - rows, err := infoschema.AppendHostInfoToRows(rows) - return rows, false, err + for { + select { + case slowLog, ok = <-e.parsedSlowLogCh: + case <-ctx.Done(): + return nil, false, ctx.Err() + } + if !ok { + return nil, true, nil + } + rows, err := slowLog.rows, slowLog.err + if err != nil { + return nil, false, err + } + if len(rows) == 0 { + continue + } + if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) { + rows, err := infoschema.AppendHostInfoToRows(rows) + return rows, false, err + } + return rows, false, nil } - return rows, false, nil } type slowLogChecker struct { @@ -186,35 +183,144 @@ func (sc *slowLogChecker) isTimeValid(t types.Time) bool { return true } -// TODO: optimize for parse huge log-file. -func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { - var rows [][]types.Datum - var st *slowQueryTuple - startFlag := false - tz := ctx.GetSessionVars().Location() - for { - if len(rows) >= maxRow { - return rows, nil +func getOneLine(reader *bufio.Reader) ([]byte, error) { + var resByte []byte + lineByte, isPrefix, err := reader.ReadLine() + if isPrefix { + // Need to read more data. + resByte = make([]byte, len(lineByte), len(lineByte)*2) + } else { + resByte = make([]byte, len(lineByte)) + } + // Use copy here to avoid shallow copy problem. + copy(resByte, lineByte) + if err != nil { + return resByte, err + } + + var tempLine []byte + for isPrefix { + tempLine, isPrefix, err = reader.ReadLine() + resByte = append(resByte, tempLine...) + // Use the max value of max_allowed_packet to check the single line length. + if len(resByte) > int(variable.MaxOfMaxAllowedPacket) { + return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket) } - e.fileLine++ - lineByte, err := getOneLine(reader) if err != nil { - if err == io.EOF { - e.fileIdx++ - e.fileLine = 0 - if e.fileIdx >= len(e.files) { - return rows, nil + return resByte, err + } + } + return resByte, err +} + +type offset struct { + offset int + length int +} + +func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, num int) ([]string, error) { + var line string + log := make([]string, 0, num) + var err error + for i := 0; i < num; i++ { + for { + e.fileLine++ + lineByte, err := getOneLine(reader) + if err != nil { + if err == io.EOF { + e.fileIdx++ + e.fileLine = 0 + if e.fileIdx >= len(e.files) { + return log, nil + } + offset.length = len(log) + reader.Reset(e.files[e.fileIdx].file) + continue } - reader.Reset(e.files[e.fileIdx].file) - continue + return log, err } - return rows, err + line = string(hack.String(lineByte)) + log = append(log, line) + if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) { + if strings.HasPrefix(line, "use") { + continue + } + break + } + } + } + return log, err +} + +func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) { + var wg sync.WaitGroup + offset := offset{offset: 0, length: 0} + // To limit the num of go routine + ch := make(chan int, sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency) + defer close(ch) + for { + log, err := e.getBatchLog(reader, &offset, logNum) + if err != nil { + e.parsedSlowLogCh <- parsedSlowLog{nil, err} + break + } + start := offset + wg.Add(1) + ch <- 1 + go func() { + defer wg.Done() + result, err := e.parseLog(sctx, log, start) + if err != nil { + e.parsedSlowLogCh <- parsedSlowLog{nil, err} + } else { + e.parsedSlowLogCh <- parsedSlowLog{result, err} + } + <-ch + }() + // Read the next file, offset = 0 + if e.fileIdx >= len(e.files) { + break + } + offset.offset = e.fileLine + offset.length = 0 + select { + case <-ctx.Done(): + break + default: + } + } + wg.Wait() +} + +func getLineIndex(offset offset, index int) int { + var fileLine int + if offset.length <= index { + fileLine = index - offset.length + 1 + } else { + fileLine = offset.offset + index + 1 + } + return fileLine +} + +func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("%s", r) } - line := string(hack.String(lineByte)) - // Check slow log entry start flag. + }() + failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) { + if val.(bool) { + panic("panic test") + } + }) + var st *slowQueryTuple + tz := ctx.GetSessionVars().Location() + startFlag := false + for index, line := range log { + fileLine := getLineIndex(offset, index) if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} - valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker) + valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -224,17 +330,14 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio. } continue } - if startFlag { - // Parse slow log field. if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) { line = line[len(variable.SlowLogRowPrefixStr):] if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) { st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):] } else if strings.HasPrefix(line, variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr) { - // the user and hostname field has a special format, for example, # User@Host: root[root] @ localhost [127.0.0.1] value := line[len(variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr):] - valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, e.fileLine, e.checker) + valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -249,7 +352,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio. if strings.HasSuffix(field, ":") { field = field[:len(field)-1] } - valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker) + valid, err := st.setFieldValue(tz, field, fieldValues[i+1], fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -266,15 +369,14 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio. // please see https://github.com/pingcap/tidb/issues/17846 for more details. continue } - // Get the sql string, and mark the start flag to false. - _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker) + _, err := st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if e.checker.hasPrivilege(st.user) { - rows = append(rows, st.convertToDatumRow()) + data = append(data, st.convertToDatumRow()) } startFlag = false } else { @@ -282,37 +384,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio. } } } -} - -func getOneLine(reader *bufio.Reader) ([]byte, error) { - var resByte []byte - lineByte, isPrefix, err := reader.ReadLine() - if isPrefix { - // Need to read more data. - resByte = make([]byte, len(lineByte), len(lineByte)*2) - } else { - resByte = make([]byte, len(lineByte)) - } - // Use copy here to avoid shallow copy problem. - copy(resByte, lineByte) - if err != nil { - return resByte, err - } - - var tempLine []byte - for isPrefix { - tempLine, isPrefix, err = reader.ReadLine() - resByte = append(resByte, tempLine...) - - // Use the max value of max_allowed_packet to check the single line length. - if len(resByte) > int(variable.MaxOfMaxAllowedPacket) { - return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket) - } - if err != nil { - return resByte, err - } - } - return resByte, err + return data, nil } type slowQueryTuple struct { @@ -514,12 +586,13 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, st.preprocSubQueryTime, err = strconv.ParseFloat(value, 64) } if err != nil { - return valid, errors.Wrap(err, "Parse slow log at line "+strconv.FormatInt(int64(lineNum), 10)+" failed. Field: `"+field+"`, error") + return valid, fmt.Errorf("Parse slow log at line " + strconv.FormatInt(int64(lineNum), 10) + " failed. Field: `" + field + "`, error: " + err.Error()) } return valid, err } func (st *slowQueryTuple) convertToDatumRow() []types.Datum { + // Build the slow query result record := make([]types.Datum, 0, 64) record = append(record, types.NewTimeDatum(st.time)) record = append(record, types.NewUintDatum(st.txnStartTs)) @@ -775,6 +848,6 @@ func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { } func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) { - e.parsedSlowLogCh = make(chan parsedSlowLog, 1) + e.parsedSlowLogCh = make(chan parsedSlowLog, 100) go e.parseDataForSlowLog(ctx, sctx) } diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 70735155dad54..6aacb9a62c613 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -16,12 +16,14 @@ package executor import ( "bufio" "bytes" + "context" "io" "os" "strings" "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" @@ -31,17 +33,60 @@ import ( "github.com/pingcap/tidb/util/mock" ) -func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { - retriever := &slowQueryRetriever{} - // Ignore the error is ok for test. - terror.Log(retriever.initialize(ctx)) - rows, err := retriever.parseSlowLog(ctx, reader, 1024) +func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { + retriever.parsedSlowLogCh = make(chan parsedSlowLog, 100) + ctx := context.Background() + retriever.parseSlowLog(ctx, sctx, reader, 64) + slowLog := <-retriever.parsedSlowLogCh + rows, err := slowLog.rows, slowLog.err if err == io.EOF { err = nil } return rows, err } +func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { + retriever := &slowQueryRetriever{} + // Ignore the error is ok for test. + terror.Log(retriever.initialize(sctx)) + rows, err := parseLog(retriever, sctx, reader) + return rows, err +} + +func (s *testExecSuite) TestParseSlowLogPanic(c *C) { + slowLogStr := + `# Time: 2019-04-28T15:24:04.309074+08:00 +# Txn_start_ts: 405888132465033227 +# User@Host: root[root] @ localhost [127.0.0.1] +# Query_time: 0.216905 +# Cop_time: 0.38 Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 +# Is_internal: true +# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 +# Stats: t1:1,t2:2 +# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160 +# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160 +# Mem_max: 70724 +# Disk_max: 65536 +# Plan_from_cache: true +# Succ: false +# Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 +# Prev_stmt: update t set i = 1; +use test; +select * from t;` + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/errorMockParseSlowLogPanic", `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/errorMockParseSlowLogPanic"), IsNil) + }() + reader := bufio.NewReader(bytes.NewBufferString(slowLogStr)) + loc, err := time.LoadLocation("Asia/Shanghai") + c.Assert(err, IsNil) + sctx := mock.NewContext() + sctx.GetSessionVars().TimeZone = loc + _, err = parseSlowLog(sctx, reader) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "panic test") +} + func (s *testExecSuite) TestParseSlowLogFile(c *C) { slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00 @@ -355,9 +400,9 @@ select 7;` loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - ctx := mock.NewContext() - ctx.GetSessionVars().TimeZone = loc - ctx.GetSessionVars().SlowQueryFile = fileName3 + sctx := mock.NewContext() + sctx.GetSessionVars().TimeZone = loc + sctx.GetSessionVars().SlowQueryFile = fileName3 for i, cas := range cases { extractor := &plannercore.SlowQueryExtractor{Enable: (len(cas.startTime) > 0 && len(cas.endTime) > 0)} if extractor.Enable { @@ -370,12 +415,13 @@ select 7;` } retriever := &slowQueryRetriever{extractor: extractor} - err := retriever.initialize(ctx) + err := retriever.initialize(sctx) c.Assert(err, IsNil) comment := Commentf("case id: %v", i) c.Assert(retriever.files, HasLen, len(cas.files), comment) if len(retriever.files) > 0 { - rows, err := retriever.parseSlowLog(ctx, bufio.NewReader(retriever.files[0].file), 1024) + reader := bufio.NewReader(retriever.files[0].file) + rows, err := parseLog(retriever, sctx, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, len(cas.querys), comment) for i, row := range rows {