diff --git a/executor/explain.go b/executor/explain.go index 57d4261cf6745..1fef25865bc59 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -16,11 +16,22 @@ package executor import ( "context" + "os" + "path/filepath" + "runtime" + rpprof "runtime/pprof" + "strconv" + "sync" + "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" ) // ExplainExec represents an explain executor. @@ -89,6 +100,24 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { } } }() + if minHeapInUse, alarmRatio := e.ctx.GetSessionVars().MemoryDebugModeMinHeapInUse, e.ctx.GetSessionVars().MemoryDebugModeAlarmRatio; minHeapInUse != 0 && alarmRatio != 0 { + memoryDebugModeCtx, cancel := context.WithCancel(ctx) + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + defer func() { + // Notify and wait debug goroutine exit. + cancel() + waitGroup.Wait() + }() + go (&memoryDebugModeHandler{ + ctx: memoryDebugModeCtx, + minHeapInUse: mathutil.Abs(minHeapInUse), + alarmRatio: alarmRatio, + autoGC: minHeapInUse > 0, + memTracker: e.ctx.GetSessionVars().StmtCtx.MemTracker, + wg: &waitGroup, + }).run() + } e.executed = true chk := newFirstChunk(e.analyzeExec) for { @@ -123,3 +152,126 @@ func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor { } return nil } + +type memoryDebugModeHandler struct { + ctx context.Context + minHeapInUse int64 + alarmRatio int64 + autoGC bool + wg *sync.WaitGroup + memTracker *memory.Tracker + + infoField []zap.Field +} + +func (h *memoryDebugModeHandler) fetchCurrentMemoryUsage(gc bool) (heapInUse, trackedMem uint64) { + if gc { + runtime.GC() + } + instanceStats := &runtime.MemStats{} + runtime.ReadMemStats(instanceStats) + heapInUse = instanceStats.HeapInuse + trackedMem = uint64(h.memTracker.BytesConsumed()) + return +} + +func (h *memoryDebugModeHandler) genInfo(status string, needProfile bool, heapInUse, trackedMem int64) (fields []zap.Field, err error) { + var fileName string + h.infoField = h.infoField[:0] + h.infoField = append(h.infoField, zap.String("sql", status)) + h.infoField = append(h.infoField, zap.String("heap in use", memory.FormatBytes(heapInUse))) + h.infoField = append(h.infoField, zap.String("tracked memory", memory.FormatBytes(trackedMem))) + if needProfile { + fileName, err = getHeapProfile() + h.infoField = append(h.infoField, zap.String("heap profile", fileName)) + } + return h.infoField, err +} + +func (h *memoryDebugModeHandler) run() { + var err error + var fields []zap.Field + defer func() { + heapInUse, trackedMem := h.fetchCurrentMemoryUsage(true) + if err == nil { + fields, err := h.genInfo("finished", true, int64(heapInUse), int64(trackedMem)) + logutil.BgLogger().Info("Memory Debug Mode", fields...) + if err != nil { + logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err)) + } + } else { + fields, err := h.genInfo("debug_mode_error", false, int64(heapInUse), int64(trackedMem)) + logutil.BgLogger().Error("Memory Debug Mode", fields...) + logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err)) + } + h.wg.Done() + }() + + logutil.BgLogger().Info("Memory Debug Mode", + zap.String("sql", "started"), + zap.Bool("autoGC", h.autoGC), + zap.String("minHeapInUse", memory.FormatBytes(h.minHeapInUse)), + zap.Int64("alarmRatio", h.alarmRatio), + ) + ticker, loop := time.NewTicker(5*time.Second), 0 + for { + select { + case <-h.ctx.Done(): + return + case <-ticker.C: + heapInUse, trackedMem := h.fetchCurrentMemoryUsage(h.autoGC) + loop++ + if loop%6 == 0 { + fields, err = h.genInfo("running", false, int64(heapInUse), int64(trackedMem)) + logutil.BgLogger().Info("Memory Debug Mode", fields...) + if err != nil { + return + } + } + + if !h.autoGC { + if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse { + fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem)) + logutil.BgLogger().Warn("Memory Debug Mode", fields...) + if err != nil { + return + } + } + } else { + if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse { + fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem)) + logutil.BgLogger().Warn("Memory Debug Mode", fields...) + if err != nil { + return + } + ts := h.memTracker.SearchTrackerConsumedMoreThanNBytes(h.minHeapInUse / 5) + logs := make([]zap.Field, 0, len(ts)) + for _, t := range ts { + logs = append(logs, zap.String("Executor_"+strconv.Itoa(t.Label()), memory.FormatBytes(t.BytesConsumed()))) + } + logutil.BgLogger().Warn("Memory Debug Mode, Log all trackers that consumes more than threshold * 20%", logs...) + } + } + } + } +} + +func getHeapProfile() (fileName string, err error) { + tempDir := filepath.Join(config.GetGlobalConfig().TempStoragePath, "record") + timeString := time.Now().Format(time.RFC3339) + fileName = filepath.Join(tempDir, "heapGC"+timeString) + f, err := os.Create(fileName) + if err != nil { + return "", err + } + p := rpprof.Lookup("heap") + err = p.WriteTo(f, 0) + if err != nil { + return "", err + } + err = f.Close() + if err != nil { + return "", err + } + return fileName, nil +} diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 89873ff086650..e5c04aa6f538b 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -1238,6 +1238,13 @@ func (e *Explain) RenderResult() error { if e.Rows == nil || e.Analyze { flat := FlattenPhysicalPlan(e.TargetPlan, true) e.explainFlatPlanInRowFormat(flat) + if e.Analyze && + e.SCtx().GetSessionVars().MemoryDebugModeMinHeapInUse != 0 && + e.SCtx().GetSessionVars().MemoryDebugModeAlarmRatio > 0 { + row := e.Rows[0] + tracker := e.SCtx().GetSessionVars().StmtCtx.MemTracker + row[7] = row[7] + "(Total: " + tracker.FormatBytes(tracker.MaxConsumed()) + ")" + } } case types.ExplainFormatDOT: if physicalPlan, ok := e.TargetPlan.(PhysicalPlan); ok { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 04e1279e37410..add261a90d449 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1165,6 +1165,12 @@ type SessionVars struct { // RequestSourceType is the type of inner request. RequestSourceType string + + // MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode. + MemoryDebugModeMinHeapInUse int64 + // MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check. + // When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded. + MemoryDebugModeAlarmRatio int64 } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 98c217cf3cf5a..dba7781dfbe1f 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1667,6 +1667,14 @@ var defaultSysVars = []*SysVar{ metrics.ToggleSimplifiedMode(TiDBOptOn(s)) return nil }}, + {Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0) + return nil + }}, + {Scope: ScopeSession, Name: TiDBMemoryDebugModeAlarmRatio, Value: strconv.Itoa(0), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0) + return nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 1146732e6030c..ebf0fd2587624 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -1059,3 +1059,13 @@ func TestTiDBCommitterConcurrency(t *testing.T) { require.Equal(t, val, fmt.Sprintf("%d", expected)) require.NoError(t, err) } + +func TestDefaultMemoryDebugModeValue(t *testing.T) { + vars := NewSessionVars() + val, err := GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeMinHeapInUse) + require.NoError(t, err) + require.Equal(t, val, "0") + val, err = GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeAlarmRatio) + require.NoError(t, err) + require.Equal(t, val, "0") +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2e55dfdb2353d..54b49f840f494 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -684,6 +684,16 @@ const ( // TiDBSimplifiedMetrics controls whether to unregister some unused metrics. TiDBSimplifiedMetrics = "tidb_simplified_metrics" + + // TiDBMemoryDebugModeMinHeapInUse is used to set tidb memory debug mode trigger threshold. + // When set to 0, the function is disabled. + // When set to a negative integer, use memory debug mode to detect the issue of frequent allocation and release of memory. + // We do not actively trigger gc, and check whether the `tracker memory * (1+bias ratio) > heap in use` each 5s. + // When set to a positive integer, use memory debug mode to detect the issue of memory tracking inaccurate. + // We trigger runtime.GC() each 5s, and check whether the `tracker memory * (1+bias ratio) > heap in use`. + TiDBMemoryDebugModeMinHeapInUse = "tidb_memory_debug_mode_min_heap_inuse" + // TiDBMemoryDebugModeAlarmRatio is used set tidb memory debug mode bias ratio. Treat memory bias less than this ratio as noise. + TiDBMemoryDebugModeAlarmRatio = "tidb_memory_debug_mode_alarm_ratio" ) // TiDB vars that have only global scope diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 75dba11cea11f..d0b1e17c22bc9 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -418,6 +418,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { return nil } +// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes. +func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) { + t.mu.Lock() + defer t.mu.Unlock() + for _, childSlice := range t.mu.children { + for _, tracker := range childSlice { + if tracker.BytesConsumed() > limit { + res = append(res, tracker) + } + } + } + return +} + // String returns the string representation of this Tracker tree. func (t *Tracker) String() string { buffer := bytes.NewBufferString("\n")