Skip to content

Commit

Permalink
*: log each com_stmt_fetch separately (pingcap#11987) (pingcap#12392)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Sep 25, 2019
1 parent 67cf1a3 commit ca80b7b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 40 deletions.
35 changes: 21 additions & 14 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (a *recordSet) NewChunk() *chunk.Chunk {

func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil)
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, false)
sessVars := a.stmt.Ctx.GetSessionVars()
pps := types.CloneRow(sessVars.PreparedParams)
sessVars.PrevStmt = FormatSQL(a.stmt.OriginText(), pps)
Expand All @@ -144,6 +144,11 @@ func (a *recordSet) Close() error {
return err
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, true)
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// InfoSchema stores a reference to the schema information.
Expand Down Expand Up @@ -646,7 +651,7 @@ func FormatSQL(sql string, pps variable.PreparedParams) stringutil.StringerFunc
}

// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
level := log.GetLevel()
cfg := config.GetGlobalConfig()
Expand All @@ -670,18 +675,20 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
_, digest := sessVars.StmtCtx.SQLDigest()
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Digest: digest,
TimeTotal: costTime,
TimeParse: a.Ctx.GetSessionVars().DurationParse,
TimeCompile: a.Ctx.GetSessionVars().DurationCompile,
IndexNames: indexNames,
StatsInfos: statsInfos,
CopTasks: copTaskInfo,
ExecDetail: execDetail,
MemMax: memMax,
Succ: succ,
TxnTS: txnTS,
SQL: sql.String(),
Digest: digest,
TimeTotal: costTime,
TimeParse: a.Ctx.GetSessionVars().DurationParse,
TimeCompile: a.Ctx.GetSessionVars().DurationCompile,
IndexNames: indexNames,
StatsInfos: statsInfos,
CopTasks: copTaskInfo,
ExecDetail: execDetail,
MemMax: memMax,
Succ: succ,
Prepared: a.isPreparedStmt,
HasMoreResults: hasMoreResults,
}
if _, ok := a.StmtNode.(*ast.CommitStmt); ok {
slowItems.PrevStmt = sessVars.PrevStmt.String()
Expand Down
3 changes: 3 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,9 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
return err
}
}
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}
return cc.writeEOF(serverStatus)
}

Expand Down
5 changes: 5 additions & 0 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
if err != nil {
return err
}
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}
// explicitly flush columnInfo to client.
return cc.flush()
}
Expand All @@ -212,6 +215,7 @@ const (
)

func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) {
cc.ctx.GetSessionVars().StartTime = time.Now()

stmtID, fetchSize, err := parseStmtFetchCmd(data)
if err != nil {
Expand Down Expand Up @@ -548,6 +552,7 @@ func (cc *clientConn) handleStmtReset(data []byte) (err error) {
strconv.Itoa(stmtID), "stmt_reset")
}
stmt.Reset()
stmt.StoreResultSet(nil)
return cc.writeOK()
}

Expand Down
7 changes: 7 additions & 0 deletions server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,10 @@ type ResultSet interface {
GetFetchedRows() []chunk.Row
Close() error
}

// fetchNotifier represents notifier will be called in COM_FETCH.
type fetchNotifier interface {
// OnFetchReturned be called when COM_FETCH returns.
// it will be used in server-side cursor.
OnFetchReturned()
}
7 changes: 7 additions & 0 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ func (trs *tidbResultSet) Close() error {
return trs.recordSet.Close()
}

// OnFetchReturned implements fetchNotifier#OnFetchReturned
func (trs *tidbResultSet) OnFetchReturned() {
if cl, ok := trs.recordSet.(fetchNotifier); ok {
cl.OnFetchReturned()
}
}

func (trs *tidbResultSet) Columns() []*ColumnInfo {
if trs.columns == nil {
fields := trs.recordSet.Fields()
Expand Down
2 changes: 1 addition & 1 deletion session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
// If it is not a select statement, we record its slow log here,
// then it could include the transaction commit time.
if rs == nil {
s.(*executor.ExecStmt).LogSlowQuery(origTxnCtx.StartTS, err == nil)
s.(*executor.ExecStmt).LogSlowQuery(origTxnCtx.StartTS, err == nil, false)
s.(*executor.ExecStmt).SummaryStmt()
pps := types.CloneRow(sessVars.PreparedParams)
sessVars.PrevStmt = executor.FormatSQL(s.OriginText(), pps)
Expand Down
40 changes: 27 additions & 13 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ const (
SlowLogConnIDStr = "Conn_ID"
// SlowLogQueryTimeStr is slow log field name.
SlowLogQueryTimeStr = "Query_time"
// SlowLogParseTimeStr is the parse sql time.
SlowLogParseTimeStr = "Parse_time"
// SlowLogCompileTimeStr is the compile plan time.
SlowLogCompileTimeStr = "Compile_time"
// SlowLogDBStr is slow log field name.
SlowLogDBStr = "DB"
// SlowLogIsInternalStr is slow log field name.
Expand Down Expand Up @@ -1021,6 +1025,10 @@ const (
SlowLogCopWaitAddr = "Cop_wait_addr"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogPrepared is used to indicate whether this sql execute in prepare.
SlowLogPrepared = "Prepared"
// SlowLogHasMoreResults is used to indicate whether this sql has more following results.
SlowLogHasMoreResults = "Has_more_results"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
// SlowLogPrevStmt is used to show the previous executed statement.
Expand All @@ -1032,19 +1040,21 @@ const (
// SlowQueryLogItems is a collection of items that should be included in the
// slow query log.
type SlowQueryLogItems struct {
TxnTS uint64
SQL string
Digest string
TimeTotal time.Duration
TimeParse time.Duration
TimeCompile time.Duration
IndexNames string
StatsInfos map[string]uint64
CopTasks *stmtctx.CopTasksDetails
ExecDetail execdetails.ExecDetails
MemMax int64
Succ bool
PrevStmt string
TxnTS uint64
SQL string
Digest string
TimeTotal time.Duration
TimeParse time.Duration
TimeCompile time.Duration
IndexNames string
StatsInfos map[string]uint64
CopTasks *stmtctx.CopTasksDetails
ExecDetail execdetails.ExecDetails
MemMax int64
Succ bool
Prepared bool
HasMoreResults bool
PrevStmt string
}

// SlowLogFormat uses for formatting slow log.
Expand Down Expand Up @@ -1078,6 +1088,8 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
writeSlowLogItem(&buf, SlowLogConnIDStr, strconv.FormatUint(s.ConnectionID, 10))
}
writeSlowLogItem(&buf, SlowLogQueryTimeStr, strconv.FormatFloat(logItems.TimeTotal.Seconds(), 'f', -1, 64))
writeSlowLogItem(&buf, SlowLogParseTimeStr, strconv.FormatFloat(logItems.TimeParse.Seconds(), 'f', -1, 64))
writeSlowLogItem(&buf, SlowLogCompileTimeStr, strconv.FormatFloat(logItems.TimeCompile.Seconds(), 'f', -1, 64))

if execDetailStr := logItems.ExecDetail.String(); len(execDetailStr) > 0 {
buf.WriteString(SlowLogRowPrefixStr + execDetailStr + "\n")
Expand Down Expand Up @@ -1143,6 +1155,8 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
writeSlowLogItem(&buf, SlowLogMemMax, strconv.FormatInt(logItems.MemMax, 10))
}

writeSlowLogItem(&buf, SlowLogPrepared, strconv.FormatBool(logItems.Prepared))
writeSlowLogItem(&buf, SlowLogHasMoreResults, strconv.FormatBool(logItems.HasMoreResults))
writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ))

if logItems.PrevStmt != "" {
Expand Down
30 changes: 18 additions & 12 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# User: [email protected]
# Conn_ID: 1
# Query_time: 1
# Parse_time: 0.00000001
# Compile_time: 0.00000001
# Process_time: 2 Wait_time: 60 Backoff_time: 0.001 Request_count: 2 Total_keys: 10000 Process_keys: 20001
# DB: test
# Index_names: [t1:a,t2:b]
Expand All @@ -134,23 +136,27 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
# Mem_max: 2333
# Prepared: true
# Has_more_results: true
# Succ: true
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(&variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql,
Digest: digest,
TimeTotal: costTime,
TimeParse: time.Duration(10),
TimeCompile: time.Duration(10),
IndexNames: "[t1:a,t2:b]",
StatsInfos: statsInfos,
CopTasks: copTasks,
ExecDetail: execDetail,
MemMax: memMax,
Succ: true,
TxnTS: txnTS,
SQL: sql,
Digest: digest,
TimeTotal: costTime,
TimeParse: time.Duration(10),
TimeCompile: time.Duration(10),
IndexNames: "[t1:a,t2:b]",
StatsInfos: statsInfos,
CopTasks: copTasks,
ExecDetail: execDetail,
MemMax: memMax,
Prepared: true,
HasMoreResults: true,
Succ: true,
})
c.Assert(logString, Equals, resultString)
}

0 comments on commit ca80b7b

Please sign in to comment.