Skip to content

Commit

Permalink
server: prepare startSampleEnvironment for reuse
Browse files Browse the repository at this point in the history
We also need it for the standalone SQL server.

Release justification: low-risk change that is needed for free tier.
Release note: None
  • Loading branch information
tbg committed Sep 3, 2020
1 parent f4816c3 commit d0163ed
Showing 1 changed file with 38 additions and 24 deletions.
62 changes: 38 additions & 24 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,14 @@ func (s *Server) Start(ctx context.Context) error {
)

// Begin recording runtime statistics.
if err := s.startSampleEnvironment(ctx, base.DefaultMetricsSampleInterval); err != nil {
if err := startSampleEnvironment(s.AnnotateCtx(ctx), sampleEnvironmentCfg{
st: s.ClusterSettings(),
stopper: s.stopper,
minSampleInterval: base.DefaultMetricsSampleInterval,
goroutineDumpDirName: s.cfg.GoroutineDumpDirName,
heapProfileDirName: s.cfg.HeapProfileDirName,
runtime: s.runtime,
}); err != nil {
return err
}

Expand Down Expand Up @@ -1971,19 +1978,26 @@ func (s *Server) Decommission(
return nil
}

// startSampleEnvironment begins the heap profiler worker.
func (s *Server) startSampleEnvironment(
ctx context.Context, minSampleInterval time.Duration,
) error {
type sampleEnvironmentCfg struct {
st *cluster.Settings
stopper *stop.Stopper
minSampleInterval time.Duration
goroutineDumpDirName string
heapProfileDirName string
runtime *status.RuntimeStatSampler
}

// startSampleEnvironment starts a periodic loop that samples the environment and,
// when appropriate, creates goroutine and/or heap dumps.
func startSampleEnvironment(ctx context.Context, cfg sampleEnvironmentCfg) error {
// Immediately record summaries once on server startup.
ctx = s.AnnotateCtx(ctx)

// Initialize a goroutine dumper if we have an output directory
// specified.
var goroutineDumper *goroutinedumper.GoroutineDumper
if s.cfg.GoroutineDumpDirName != "" {
if cfg.goroutineDumpDirName != "" {
hasValidDumpDir := true
if err := os.MkdirAll(s.cfg.GoroutineDumpDirName, 0755); err != nil {
if err := os.MkdirAll(cfg.goroutineDumpDirName, 0755); err != nil {
// This is possible when running with only in-memory stores;
// in that case the start-up code sets the output directory
// to the current directory (.). If wrunning the process
Expand All @@ -1994,7 +2008,7 @@ func (s *Server) startSampleEnvironment(
}
if hasValidDumpDir {
var err error
goroutineDumper, err = goroutinedumper.NewGoroutineDumper(ctx, s.cfg.GoroutineDumpDirName, s.ClusterSettings())
goroutineDumper, err = goroutinedumper.NewGoroutineDumper(ctx, cfg.goroutineDumpDirName, cfg.st)
if err != nil {
return errors.Wrap(err, "starting goroutine dumper worker")
}
Expand All @@ -2006,9 +2020,9 @@ func (s *Server) startSampleEnvironment(
var heapProfiler *heapprofiler.HeapProfiler
var nonGoAllocProfiler *heapprofiler.NonGoAllocProfiler
var statsProfiler *heapprofiler.StatsProfiler
if s.cfg.HeapProfileDirName != "" {
if cfg.heapProfileDirName != "" {
hasValidDumpDir := true
if err := os.MkdirAll(s.cfg.HeapProfileDirName, 0755); err != nil {
if err := os.MkdirAll(cfg.heapProfileDirName, 0755); err != nil {
// This is possible when running with only in-memory stores;
// in that case the start-up code sets the output directory
// to the current directory (.). If wrunning the process
Expand All @@ -2020,37 +2034,37 @@ func (s *Server) startSampleEnvironment(

if hasValidDumpDir {
var err error
heapProfiler, err = heapprofiler.NewHeapProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings())
heapProfiler, err = heapprofiler.NewHeapProfiler(ctx, cfg.heapProfileDirName, cfg.st)
if err != nil {
return errors.Wrap(err, "starting heap profiler worker")
}
nonGoAllocProfiler, err = heapprofiler.NewNonGoAllocProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings())
nonGoAllocProfiler, err = heapprofiler.NewNonGoAllocProfiler(ctx, cfg.heapProfileDirName, cfg.st)
if err != nil {
return errors.Wrap(err, "starting non-go alloc profiler worker")
}
statsProfiler, err = heapprofiler.NewStatsProfiler(ctx, s.cfg.HeapProfileDirName, s.ClusterSettings())
statsProfiler, err = heapprofiler.NewStatsProfiler(ctx, cfg.heapProfileDirName, cfg.st)
if err != nil {
return errors.Wrap(err, "starting memory stats collector worker")
}
}
}

s.stopper.RunWorker(ctx, func(ctx context.Context) {
cfg.stopper.RunWorker(ctx, func(ctx context.Context) {
var goMemStats atomic.Value // *status.GoMemStats
goMemStats.Store(&status.GoMemStats{})
var collectingMemStats int32 // atomic, 1 when stats call is ongoing

timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(minSampleInterval)
timer.Reset(cfg.minSampleInterval)

for {
select {
case <-s.stopper.ShouldStop():
case <-cfg.stopper.ShouldStop():
return
case <-timer.C:
timer.Read = true
timer.Reset(minSampleInterval)
timer.Reset(cfg.minSampleInterval)

// We read the heap stats on another goroutine and give up after 1s.
// This is necessary because as of Go 1.12, runtime.ReadMemStats()
Expand All @@ -2061,7 +2075,7 @@ func (s *Server) startSampleEnvironment(
// this hasn't been observed to be a problem.
statsCollected := make(chan struct{})
if atomic.CompareAndSwapInt32(&collectingMemStats, 0, 1) {
if err := s.stopper.RunAsyncTask(ctx, "get-mem-stats", func(ctx context.Context) {
if err := cfg.stopper.RunAsyncTask(ctx, "get-mem-stats", func(ctx context.Context) {
var ms status.GoMemStats
runtime.ReadMemStats(&ms.MemStats)
ms.Collected = timeutil.Now()
Expand All @@ -2083,14 +2097,14 @@ func (s *Server) startSampleEnvironment(

curStats := goMemStats.Load().(*status.GoMemStats)
cgoStats := status.GetCGoMemStats(ctx)
s.runtime.SampleEnvironment(ctx, curStats, cgoStats)
cfg.runtime.SampleEnvironment(ctx, curStats, cgoStats)
if goroutineDumper != nil {
goroutineDumper.MaybeDump(ctx, s.ClusterSettings(), s.runtime.Goroutines.Value())
goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value())
}
if heapProfiler != nil {
heapProfiler.MaybeTakeProfile(ctx, s.runtime.GoAllocBytes.Value())
nonGoAllocProfiler.MaybeTakeProfile(ctx, s.runtime.CgoTotalBytes.Value())
statsProfiler.MaybeTakeProfile(ctx, s.runtime.RSSBytes.Value(), curStats, cgoStats)
heapProfiler.MaybeTakeProfile(ctx, cfg.runtime.GoAllocBytes.Value())
nonGoAllocProfiler.MaybeTakeProfile(ctx, cfg.runtime.CgoTotalBytes.Value())
statsProfiler.MaybeTakeProfile(ctx, cfg.runtime.RSSBytes.Value(), curStats, cgoStats)
}
}
}
Expand Down

0 comments on commit d0163ed

Please sign in to comment.