From f8a6bde954cccc9916cc1a082bda2b8b32414874 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Mon, 28 Nov 2022 17:02:01 +0800 Subject: [PATCH] *: add a reference count for StmtCtx (#39368) close pingcap/tidb#27725 --- session/session.go | 1 + sessionctx/stmtctx/stmtctx.go | 36 +++++++++++++++++++ sessionctx/variable/session.go | 12 ++++++- util/expensivequery/expensivequery.go | 6 +++- util/memoryusagealarm/memoryusagealarm.go | 10 +++--- .../memoryusagealarm/memoryusagealarm_test.go | 2 ++ util/processinfo.go | 1 + util/util.go | 5 +++ util/util_test.go | 8 +++-- 9 files changed, 72 insertions(+), 9 deletions(-) diff --git a/session/session.go b/session/session.go index e95e4e7283679..824384367f33b 100644 --- a/session/session.go +++ b/session/session.go @@ -1588,6 +1588,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu Info: sql, CurTxnStartTS: curTxnStartTS, StmtCtx: s.sessionVars.StmtCtx, + RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, MemTracker: s.sessionVars.MemTracker, DiskTracker: s.sessionVars.DiskTracker, StatsInfo: plannercore.GetStatsInfo, diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 328b61f87ee69..6c0f9003d6aef 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -101,6 +101,42 @@ func (warn *SQLWarn) UnmarshalJSON(data []byte) error { return nil } +// ReferenceCount indicates the reference count of StmtCtx. +type ReferenceCount int32 + +const ( + // ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions. + ReferenceCountIsFrozen int32 = -1 + // ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions. + ReferenceCountNoReference int32 = 0 +) + +// TryIncrease tries to increase the reference count. +// There is a small chance that TryIncrease returns true while TryFreeze and +// UnFreeze are invoked successfully during the execution of TryIncrease. +func (rf *ReferenceCount) TryIncrease() bool { + refCnt := atomic.LoadInt32((*int32)(rf)) + for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } + return refCnt != ReferenceCountIsFrozen +} + +// Decrease decreases the reference count. +func (rf *ReferenceCount) Decrease() { + for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } +} + +// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx. +func (rf *ReferenceCount) TryFreeze() bool { + return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen) +} + +// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx. +func (rf *ReferenceCount) UnFreeze() { + atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference) +} + // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cb8caf59819f5..5115c0a834ad6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -742,6 +742,11 @@ type SessionVars struct { // StmtCtx holds variables for current executing statement. StmtCtx *stmtctx.StatementContext + // RefCountOfStmtCtx indicates the reference count of StmtCtx. When the + // StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first. + // Note: this variable should be accessed and updated by atomic operations. + RefCountOfStmtCtx stmtctx.ReferenceCount + // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool @@ -1389,7 +1394,12 @@ func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { if sc == s.StmtCtx { sc = &s.cachedStmtCtx[1] } - *sc = stmtctx.StatementContext{} + if s.RefCountOfStmtCtx.TryFreeze() { + *sc = stmtctx.StatementContext{} + s.RefCountOfStmtCtx.UnFreeze() + } else { + sc = &stmtctx.StatementContext{} + } return sc } diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index e761d98ab05a2..aff2f3be7829d 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -111,5 +111,9 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { - logutil.BgLogger().Warn(msg, util.GenLogFields(costTime, info, true)...) + fields := util.GenLogFields(costTime, info, true) + if fields == nil { + return + } + logutil.BgLogger().Warn(msg, fields...) } diff --git a/util/memoryusagealarm/memoryusagealarm.go b/util/memoryusagealarm/memoryusagealarm.go index 882d8462ef60b..c8a6fd0eaecda 100644 --- a/util/memoryusagealarm/memoryusagealarm.go +++ b/util/memoryusagealarm/memoryusagealarm.go @@ -266,15 +266,16 @@ func (record *memoryUsageAlarm) printTop10SqlInfo(pinfo []*util.ProcessInfo, f * func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) bool, pinfo []*util.ProcessInfo) strings.Builder { slices.SortFunc(pinfo, cmp) list := pinfo - if len(list) > 10 { - list = list[:10] - } var buf strings.Builder oomAction := variable.OOMAction.Load() serverMemoryLimit := memory.ServerMemoryLimit.Load() - for i, info := range list { + for i, totalCnt := 0, 10; i < len(list) && totalCnt > 0; i++ { + info := list[i] buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) fields := util.GenLogFields(record.lastCheckTime.Sub(info.Time), info, false) + if fields == nil { + continue + } fields = append(fields, zap.String("tidb_mem_oom_action", oomAction)) fields = append(fields, zap.Uint64("tidb_server_memory_limit", serverMemoryLimit)) fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery)) @@ -294,6 +295,7 @@ func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) } buf.WriteString("\n") } + totalCnt-- } buf.WriteString("\n") return buf diff --git a/util/memoryusagealarm/memoryusagealarm_test.go b/util/memoryusagealarm/memoryusagealarm_test.go index f0b0af6bd99ec..6e5147805676f 100644 --- a/util/memoryusagealarm/memoryusagealarm_test.go +++ b/util/memoryusagealarm/memoryusagealarm_test.go @@ -104,12 +104,14 @@ func genMockProcessInfoList(memConsumeList []int64, startTimeList []time.Time, s for i := 0; i < size; i++ { tracker := memory.NewTracker(0, 0) tracker.Consume(memConsumeList[i]) + var stmtCtxRefCount stmtctx.ReferenceCount = 0 processInfo := util.ProcessInfo{Time: startTimeList[i], StmtCtx: &stmtctx.StatementContext{}, MemTracker: tracker, StatsInfo: func(interface{}) map[string]uint64 { return map[string]uint64{} }, + RefCountOfStmtCtx: &stmtCtxRefCount, } processInfoList = append(processInfoList, &processInfo) } diff --git a/util/processinfo.go b/util/processinfo.go index 6e379e49a8fe7..f330098138b35 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -43,6 +43,7 @@ type ProcessInfo struct { Time time.Time Plan interface{} StmtCtx *stmtctx.StatementContext + RefCountOfStmtCtx *stmtctx.ReferenceCount MemTracker *memory.Tracker DiskTracker *disk.Tracker StatsInfo func(interface{}) map[string]uint64 diff --git a/util/util.go b/util/util.go index 8af2876240486..249db91f98d06 100644 --- a/util/util.go +++ b/util/util.go @@ -106,6 +106,11 @@ func Str2Int64Map(str string) map[int64]struct{} { // GenLogFields generate log fields. func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL bool) []zap.Field { + if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { + return nil + } + defer info.RefCountOfStmtCtx.Decrease() + logFields := make([]zap.Field, 0, 20) logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) execDetail := info.StmtCtx.GetExecDetails() diff --git a/util/util_test.go b/util/util_test.go index 7eb06e1071073..ca68a55cd8ba6 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -28,6 +28,7 @@ func TestLogFormat(t *testing.T) { mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) mockTooLongQuery := make([]byte, 1024*9) + var refCount stmtctx.ReferenceCount = 0 info := &ProcessInfo{ ID: 233, User: "PingCAP", @@ -38,9 +39,10 @@ func TestLogFormat(t *testing.T) { StatsInfo: func(interface{}) map[string]uint64 { return nil }, - StmtCtx: &stmtctx.StatementContext{}, - MemTracker: mem, - RedactSQL: false, + StmtCtx: &stmtctx.StatementContext{}, + RefCountOfStmtCtx: &refCount, + MemTracker: mem, + RedactSQL: false, } costTime := time.Second * 233 logSQLTruncateLen := 1024 * 8