diff --git a/pkg/server/server.go b/pkg/server/server.go index 1ada30e96705..378b730e4b59 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1521,7 +1521,14 @@ func (s *Server) Start(ctx context.Context) error { ) // Begin recording runtime statistics. - if err := s.startSampleEnvironment(ctx, base.DefaultMetricsSampleInterval); err != nil { + if err := startSampleEnvironment(s.AnnotateCtx(ctx), sampleEnvironmentCfg{ + st: s.ClusterSettings(), + stopper: s.stopper, + minSampleInterval: base.DefaultMetricsSampleInterval, + goroutineDumpDirName: s.cfg.GoroutineDumpDirName, + heapProfileDirName: s.cfg.HeapProfileDirName, + runtime: s.runtime, + }); err != nil { return err } @@ -1971,19 +1978,26 @@ func (s *Server) Decommission( return nil } -// startSampleEnvironment begins the heap profiler worker. -func (s *Server) startSampleEnvironment( - ctx context.Context, minSampleInterval time.Duration, -) error { +type sampleEnvironmentCfg struct { + st *cluster.Settings + stopper *stop.Stopper + minSampleInterval time.Duration + goroutineDumpDirName string + heapProfileDirName string + runtime *status.RuntimeStatSampler +} + +// startSampleEnvironment starts a periodic loop that samples the environment and, +// when appropriate, creates goroutine and/or heap dumps. +func startSampleEnvironment(ctx context.Context, cfg sampleEnvironmentCfg) error { // Immediately record summaries once on server startup. - ctx = s.AnnotateCtx(ctx) // Initialize a goroutine dumper if we have an output directory // specified. var goroutineDumper *goroutinedumper.GoroutineDumper - if s.cfg.GoroutineDumpDirName != "" { + if cfg.goroutineDumpDirName != "" { hasValidDumpDir := true - if err := os.MkdirAll(s.cfg.GoroutineDumpDirName, 0755); err != nil { + if err := os.MkdirAll(cfg.goroutineDumpDirName, 0755); err != nil { // This is possible when running with only in-memory stores; // in that case the start-up code sets the output directory // to the current directory (.). If wrunning the process @@ -1994,7 +2008,7 @@ func (s *Server) startSampleEnvironment( } if hasValidDumpDir { var err error - goroutineDumper, err = goroutinedumper.NewGoroutineDumper(ctx, s.cfg.GoroutineDumpDirName, s.ClusterSettings()) + goroutineDumper, err = goroutinedumper.NewGoroutineDumper(ctx, cfg.goroutineDumpDirName, cfg.st) if err != nil { return errors.Wrap(err, "starting goroutine dumper worker") } @@ -2006,9 +2020,9 @@ func (s *Server) startSampleEnvironment( var heapProfiler *heapprofiler.HeapProfiler var nonGoAllocProfiler *heapprofiler.NonGoAllocProfiler var statsProfiler *heapprofiler.StatsProfiler - if s.cfg.HeapProfileDirName != "" { + if cfg.heapProfileDirName != "" { hasValidDumpDir := true - if err := os.MkdirAll(s.cfg.HeapProfileDirName, 0755); err != nil { + if err := os.MkdirAll(cfg.heapProfileDirName, 0755); err != nil { // This is possible when running with only in-memory stores; // in that case the start-up code sets the output directory // to the current directory (.). If wrunning the process @@ -2020,37 +2034,37 @@ func (s *Server) startSampleEnvironment( if hasValidDumpDir { var err error - heapProfiler, err = heapprofiler.NewHeapProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings()) + heapProfiler, err = heapprofiler.NewHeapProfiler(ctx, cfg.heapProfileDirName, cfg.st) if err != nil { return errors.Wrap(err, "starting heap profiler worker") } - nonGoAllocProfiler, err = heapprofiler.NewNonGoAllocProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings()) + nonGoAllocProfiler, err = heapprofiler.NewNonGoAllocProfiler(ctx, cfg.heapProfileDirName, cfg.st) if err != nil { return errors.Wrap(err, "starting non-go alloc profiler worker") } - statsProfiler, err = heapprofiler.NewStatsProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings()) + statsProfiler, err = heapprofiler.NewStatsProfiler(ctx, cfg.heapProfileDirName, cfg.st) if err != nil { return errors.Wrap(err, "starting memory stats collector worker") } } } - s.stopper.RunWorker(ctx, func(ctx context.Context) { + cfg.stopper.RunWorker(ctx, func(ctx context.Context) { var goMemStats atomic.Value // *status.GoMemStats goMemStats.Store(&status.GoMemStats{}) var collectingMemStats int32 // atomic, 1 when stats call is ongoing timer := timeutil.NewTimer() defer timer.Stop() - timer.Reset(minSampleInterval) + timer.Reset(cfg.minSampleInterval) for { select { - case <-s.stopper.ShouldStop(): + case <-cfg.stopper.ShouldStop(): return case <-timer.C: timer.Read = true - timer.Reset(minSampleInterval) + timer.Reset(cfg.minSampleInterval) // We read the heap stats on another goroutine and give up after 1s. // This is necessary because as of Go 1.12, runtime.ReadMemStats() @@ -2061,7 +2075,7 @@ func (s *Server) startSampleEnvironment( // this hasn't been observed to be a problem. statsCollected := make(chan struct{}) if atomic.CompareAndSwapInt32(&collectingMemStats, 0, 1) { - if err := s.stopper.RunAsyncTask(ctx, "get-mem-stats", func(ctx context.Context) { + if err := cfg.stopper.RunAsyncTask(ctx, "get-mem-stats", func(ctx context.Context) { var ms status.GoMemStats runtime.ReadMemStats(&ms.MemStats) ms.Collected = timeutil.Now() @@ -2083,14 +2097,14 @@ func (s *Server) startSampleEnvironment( curStats := goMemStats.Load().(*status.GoMemStats) cgoStats := status.GetCGoMemStats(ctx) - s.runtime.SampleEnvironment(ctx, curStats, cgoStats) + cfg.runtime.SampleEnvironment(ctx, curStats, cgoStats) if goroutineDumper != nil { - goroutineDumper.MaybeDump(ctx, s.ClusterSettings(), s.runtime.Goroutines.Value()) + goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value()) } if heapProfiler != nil { - heapProfiler.MaybeTakeProfile(ctx, s.runtime.GoAllocBytes.Value()) - nonGoAllocProfiler.MaybeTakeProfile(ctx, s.runtime.CgoTotalBytes.Value()) - statsProfiler.MaybeTakeProfile(ctx, s.runtime.RSSBytes.Value(), curStats, cgoStats) + heapProfiler.MaybeTakeProfile(ctx, cfg.runtime.GoAllocBytes.Value()) + nonGoAllocProfiler.MaybeTakeProfile(ctx, cfg.runtime.CgoTotalBytes.Value()) + statsProfiler.MaybeTakeProfile(ctx, cfg.runtime.RSSBytes.Value(), curStats, cgoStats) } } }