diff --git a/profiler/profile.go b/profiler/profile.go index c58a680aab..bb32323c71 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -118,7 +118,7 @@ func (p *profiler) runProfile(t ProfileType) (*profile, error) { case HeapProfile: return heapProfile(p.cfg) case CPUProfile: - return cpuProfile(p.cfg) + return p.cpuProfile() case MutexProfile: return mutexProfile(p.cfg) case BlockProfile: @@ -159,17 +159,17 @@ var ( stopCPUProfile = pprof.StopCPUProfile ) -func cpuProfile(cfg *config) (*profile, error) { +func (p *profiler) cpuProfile() (*profile, error) { var buf bytes.Buffer start := now() if err := startCPUProfile(&buf); err != nil { return nil, err } - time.Sleep(cfg.cpuDuration) + p.interruptibleSleep(p.cfg.cpuDuration) stopCPUProfile() end := now() - tags := append(cfg.tags, CPUProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) + tags := append(p.cfg.tags, CPUProfile.Tag()) + p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) return &profile{ name: CPUProfile.Filename(), data: buf.Bytes(), diff --git a/profiler/profiler.go b/profiler/profiler.go index e0a0d9ce06..7543eb9f0b 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -45,7 +45,8 @@ func Start(opts ...Option) error { return nil } -// Stop stops the profiler. +// Stop cancels any ongoing profiling or upload operations and returns after +// everything has been stopped. func Stop() { mu.Lock() if activeProfiler != nil { @@ -212,12 +213,17 @@ func (p *profiler) enqueueUpload(bat batch) { // send takes profiles from the output queue and uploads them. func (p *profiler) send() { - for bat := range p.out { - if err := p.outputDir(bat); err != nil { - log.Error("Failed to output profile to dir: %v", err) - } - if err := p.uploadFunc(bat); err != nil { - log.Error("Failed to upload profile: %v", err) + for { + select { + case <-p.exit: + return + case bat := <-p.out: + if err := p.outputDir(bat); err != nil { + log.Error("Failed to output profile to dir: %v", err) + } + if err := p.uploadFunc(bat); err != nil { + log.Error("Failed to upload profile: %v", err) + } } } } @@ -244,6 +250,15 @@ func (p *profiler) outputDir(bat batch) error { return nil } +// interruptibleSleep sleeps for the given duration or until interrupted by the +// p.exit channel being closed. +func (p *profiler) interruptibleSleep(d time.Duration) { + select { + case <-p.exit: + case <-time.After(d): + } +} + // stop stops the profiler. func (p *profiler) stop() { p.stopOnce.Do(func() { diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index 4e38cb54ed..cec69d3fe7 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -152,6 +152,49 @@ func TestStartStopIdempotency(t *testing.T) { }) } +// TestStopLatency tries to make sure that calling Stop() doesn't hang, i.e. +// that ongoing profiling or upload operations are immediately canceled. +func TestStopLatency(t *testing.T) { + p, err := newProfiler( + WithURL("http://invalid.invalid/"), + WithPeriod(1000*time.Millisecond), + CPUDuration(500*time.Millisecond), + ) + require.NoError(t, err) + uploadStart := make(chan struct{}, 1) + uploadFunc := p.uploadFunc + p.uploadFunc = func(b batch) error { + select { + case uploadStart <- struct{}{}: + default: + // uploadFunc may be called more than once, don't leak this goroutine + } + return uploadFunc(b) + } + p.run() + + <-uploadStart + // Wait for uploadFunc(b) to run. A bit racy, but worst case is the test + // passing for the wrong reasons. + time.Sleep(10 * time.Millisecond) + + stopped := make(chan struct{}, 1) + go func() { + p.stop() + stopped <- struct{}{} + }() + + timeout := 20 * time.Millisecond + select { + case <-stopped: + case <-time.After(timeout): + // Capture stacks so we can see which goroutines are hanging and why. + stacks := make([]byte, 64*1024) + stacks = stacks[0:runtime.Stack(stacks, true)] + t.Fatalf("Stop() took longer than %s:\n%s", timeout, stacks) + } +} + func TestProfilerInternal(t *testing.T) { t.Run("collect", func(t *testing.T) { p, err := unstartedProfiler( diff --git a/profiler/upload.go b/profiler/upload.go index 6d641ea0c1..68d1024a8c 100644 --- a/profiler/upload.go +++ b/profiler/upload.go @@ -30,12 +30,18 @@ func (p *profiler) upload(bat batch) error { statsd := p.cfg.statsd var err error for i := 0; i < maxRetries; i++ { + select { + case <-p.exit: + return nil + default: + } + err = p.doRequest(bat) if rerr, ok := err.(*retriableError); ok { statsd.Count("datadog.profiler.go.upload_retry", 1, nil, 1) wait := time.Duration(rand.Int63n(p.cfg.period.Nanoseconds())) log.Error("Uploading profile failed: %v. Trying again in %s...", rerr, wait) - time.Sleep(wait) + p.interruptibleSleep(time.Second) continue } if err != nil { @@ -70,9 +76,17 @@ func (p *profiler) doRequest(bat batch) error { if err != nil { return err } + funcExit := make(chan struct{}) + defer close(funcExit) // uploadTimeout is guaranteed to be >= 0, see newProfiler. ctx, cancel := context.WithTimeout(context.Background(), p.cfg.uploadTimeout) - defer cancel() + go func() { + select { + case <-p.exit: + case <-funcExit: + } + cancel() + }() // TODO(fg) use NewRequestWithContext once go 1.12 support is dropped. req, err := http.NewRequest("POST", p.cfg.targetURL, body) if err != nil {