Skip to content

Commit

Permalink
profiler: fix hanging on Stop() (#961)
Browse files Browse the repository at this point in the history
This PR fixes several issues that can cause the profiler's Stop()
function to hang and delay the graceful shutdown of a client
application. See #960 for a user experiencing this type of problem.
  • Loading branch information
felixge authored Jul 19, 2021
1 parent 5c1ff94 commit f49f770
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 14 deletions.
10 changes: 5 additions & 5 deletions profiler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (p *profiler) runProfile(t ProfileType) (*profile, error) {
case HeapProfile:
return heapProfile(p.cfg)
case CPUProfile:
return cpuProfile(p.cfg)
return p.cpuProfile()
case MutexProfile:
return mutexProfile(p.cfg)
case BlockProfile:
Expand Down Expand Up @@ -159,17 +159,17 @@ var (
stopCPUProfile = pprof.StopCPUProfile
)

func cpuProfile(cfg *config) (*profile, error) {
func (p *profiler) cpuProfile() (*profile, error) {
var buf bytes.Buffer
start := now()
if err := startCPUProfile(&buf); err != nil {
return nil, err
}
time.Sleep(cfg.cpuDuration)
p.interruptibleSleep(p.cfg.cpuDuration)
stopCPUProfile()
end := now()
tags := append(cfg.tags, CPUProfile.Tag())
cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
tags := append(p.cfg.tags, CPUProfile.Tag())
p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
return &profile{
name: CPUProfile.Filename(),
data: buf.Bytes(),
Expand Down
29 changes: 22 additions & 7 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func Start(opts ...Option) error {
return nil
}

// Stop stops the profiler.
// Stop cancels any ongoing profiling or upload operations and returns after
// everything has been stopped.
func Stop() {
mu.Lock()
if activeProfiler != nil {
Expand Down Expand Up @@ -212,12 +213,17 @@ func (p *profiler) enqueueUpload(bat batch) {

// send takes profiles from the output queue and uploads them.
func (p *profiler) send() {
for bat := range p.out {
if err := p.outputDir(bat); err != nil {
log.Error("Failed to output profile to dir: %v", err)
}
if err := p.uploadFunc(bat); err != nil {
log.Error("Failed to upload profile: %v", err)
for {
select {
case <-p.exit:
return
case bat := <-p.out:
if err := p.outputDir(bat); err != nil {
log.Error("Failed to output profile to dir: %v", err)
}
if err := p.uploadFunc(bat); err != nil {
log.Error("Failed to upload profile: %v", err)
}
}
}
}
Expand All @@ -244,6 +250,15 @@ func (p *profiler) outputDir(bat batch) error {
return nil
}

// interruptibleSleep sleeps for the given duration or until interrupted by the
// p.exit channel being closed.
func (p *profiler) interruptibleSleep(d time.Duration) {
select {
case <-p.exit:
case <-time.After(d):
}
}

// stop stops the profiler.
func (p *profiler) stop() {
p.stopOnce.Do(func() {
Expand Down
43 changes: 43 additions & 0 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,49 @@ func TestStartStopIdempotency(t *testing.T) {
})
}

// TestStopLatency tries to make sure that calling Stop() doesn't hang, i.e.
// that ongoing profiling or upload operations are immediately canceled.
func TestStopLatency(t *testing.T) {
p, err := newProfiler(
WithURL("http://invalid.invalid/"),
WithPeriod(1000*time.Millisecond),
CPUDuration(500*time.Millisecond),
)
require.NoError(t, err)
uploadStart := make(chan struct{}, 1)
uploadFunc := p.uploadFunc
p.uploadFunc = func(b batch) error {
select {
case uploadStart <- struct{}{}:
default:
// uploadFunc may be called more than once, don't leak this goroutine
}
return uploadFunc(b)
}
p.run()

<-uploadStart
// Wait for uploadFunc(b) to run. A bit racy, but worst case is the test
// passing for the wrong reasons.
time.Sleep(10 * time.Millisecond)

stopped := make(chan struct{}, 1)
go func() {
p.stop()
stopped <- struct{}{}
}()

timeout := 20 * time.Millisecond
select {
case <-stopped:
case <-time.After(timeout):
// Capture stacks so we can see which goroutines are hanging and why.
stacks := make([]byte, 64*1024)
stacks = stacks[0:runtime.Stack(stacks, true)]
t.Fatalf("Stop() took longer than %s:\n%s", timeout, stacks)
}
}

func TestProfilerInternal(t *testing.T) {
t.Run("collect", func(t *testing.T) {
p, err := unstartedProfiler(
Expand Down
18 changes: 16 additions & 2 deletions profiler/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ func (p *profiler) upload(bat batch) error {
statsd := p.cfg.statsd
var err error
for i := 0; i < maxRetries; i++ {
select {
case <-p.exit:
return nil
default:
}

err = p.doRequest(bat)
if rerr, ok := err.(*retriableError); ok {
statsd.Count("datadog.profiler.go.upload_retry", 1, nil, 1)
wait := time.Duration(rand.Int63n(p.cfg.period.Nanoseconds()))
log.Error("Uploading profile failed: %v. Trying again in %s...", rerr, wait)
time.Sleep(wait)
p.interruptibleSleep(time.Second)
continue
}
if err != nil {
Expand Down Expand Up @@ -70,9 +76,17 @@ func (p *profiler) doRequest(bat batch) error {
if err != nil {
return err
}
funcExit := make(chan struct{})
defer close(funcExit)
// uploadTimeout is guaranteed to be >= 0, see newProfiler.
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.uploadTimeout)
defer cancel()
go func() {
select {
case <-p.exit:
case <-funcExit:
}
cancel()
}()
// TODO(fg) use NewRequestWithContext once go 1.12 support is dropped.
req, err := http.NewRequest("POST", p.cfg.targetURL, body)
if err != nil {
Expand Down

0 comments on commit f49f770

Please sign in to comment.