Skip to content

Commit

Permalink
Merge branch 'main' into mtoff/fix-chi-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mtoffl01 authored Nov 5, 2024
2 parents 2f4b410 + c9fc691 commit 2cdd6a9
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 17 deletions.
6 changes: 3 additions & 3 deletions contrib/valyala/fasthttp.v1/fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func WrapHandler(h fasthttp.RequestHandler, opts ...Option) fasthttp.RequestHand
fn(cfg)
}
log.Debug("contrib/valyala/fasthttp.v1: Configuring Middleware: cfg: %#v", cfg)
spanOpts := []tracer.StartSpanOption{
tracer.ServiceName(cfg.serviceName),
}
return func(fctx *fasthttp.RequestCtx) {
if cfg.ignoreRequest(fctx) {
h(fctx)
return
}
spanOpts := []tracer.StartSpanOption{
tracer.ServiceName(cfg.serviceName),
}
spanOpts = append(spanOpts, defaultSpanOptions(fctx)...)
fcc := &fasthttptrace.HTTPHeadersCarrier{
ReqHeader: &fctx.Request.Header,
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestLogSamplingRules(t *testing.T) {
tp := new(log.RecordLogger)
tp.Ignore("appsec: ", telemetry.LogPrefix)
t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service", "sample_rate": 0.234}, {"service": "other.service"}, {"service": "last.service", "sample_rate": 0.56}, {"odd": "pairs"}, {"sample_rate": 9.10}]`)
_, _, _, stop := startTestTracer(t, WithLogger(tp))
_, _, _, stop := startTestTracer(t, WithLogger(tp), WithEnv("test"))
defer stop()

assert.Len(tp.Logs(), 1)
Expand All @@ -159,7 +159,7 @@ func TestLogDefaultSampleRate(t *testing.T) {
tp.Ignore("appsec: ", telemetry.LogPrefix)
log.UseLogger(tp)
t.Setenv("DD_TRACE_SAMPLE_RATE", ``)
_, _, _, stop := startTestTracer(t, WithLogger(tp))
_, _, _, stop := startTestTracer(t, WithLogger(tp), WithEnv("test"))
defer stop()

assert.Len(tp.Logs(), 0)
Expand Down
4 changes: 1 addition & 3 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

//go:generate msgp -unexported -marshal=false -o=stats_msgp.go -tests=false

package tracer

import (
Expand Down Expand Up @@ -68,7 +66,7 @@ func newConcentrator(c *config, bucketSize int64) *concentrator {
// This should never actually happen as the agent MUST have an env configured to start-up
// That panic will be removed in a future release at which point we can remove this
env = "unknown-env"
log.Error("No DD Env found, normally the agent MUST have one")
log.Debug("No DD Env found, normally the agent should have one")
}
aggKey := stats.PayloadAggregationKey{
Hostname: c.hostname,
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/tracer/textmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-go/v5/statsd"
)

const otelHeaderPropagationStyle = "OTEL_PROPAGATORS"
Expand Down Expand Up @@ -2001,7 +2002,7 @@ func TestNonePropagator(t *testing.T) {
t.Setenv(headerPropagationStyleInject, "none,b3")
tp := new(log.RecordLogger)
tp.Ignore("appsec: ", telemetry.LogPrefix)
tracer := newTracer(WithLogger(tp))
tracer := newTracer(WithLogger(tp), WithEnv("test"))
defer tracer.Stop()
// reinitializing to capture log output, since propagators are parsed before logger is set
tracer.config.propagator = NewPropagator(&PropagatorConfig{})
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func TestTracerRuntimeMetrics(t *testing.T) {
t.Run("on", func(t *testing.T) {
tp := new(log.RecordLogger)
tp.Ignore("appsec: ", telemetry.LogPrefix)
tracer := newTracer(WithRuntimeMetrics(), WithLogger(tp), WithDebugMode(true))
tracer := newTracer(WithRuntimeMetrics(), WithLogger(tp), WithDebugMode(true), WithEnv("test"))
defer tracer.Stop()
assert.Contains(t, tp.Logs()[0], "DEBUG: Runtime metrics enabled")
})
Expand All @@ -721,7 +721,7 @@ func TestTracerRuntimeMetrics(t *testing.T) {
t.Setenv("DD_RUNTIME_METRICS_ENABLED", "true")
tp := new(log.RecordLogger)
tp.Ignore("appsec: ", telemetry.LogPrefix)
tracer := newTracer(WithLogger(tp), WithDebugMode(true))
tracer := newTracer(WithLogger(tp), WithDebugMode(true), WithEnv("test"))
defer tracer.Stop()
assert.Contains(t, tp.Logs()[0], "DEBUG: Runtime metrics enabled")
})
Expand Down
3 changes: 3 additions & 0 deletions profiler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type config struct {
traceConfig executionTraceConfig
endpointCountEnabled bool
enabled bool
flushOnExit bool
}

// logStartup records the configuration to the configured logger in JSON format
Expand Down Expand Up @@ -148,6 +149,7 @@ func logStartup(c *config) {
"endpoint_count_enabled": c.endpointCountEnabled,
"custom_profiler_label_keys": c.customProfilerLabels,
"enabled": c.enabled,
"flush_on_exit": c.flushOnExit,
}
b, err := json.Marshal(info)
if err != nil {
Expand Down Expand Up @@ -242,6 +244,7 @@ func defaultConfig() (*config, error) {
if v := os.Getenv("DD_VERSION"); v != "" {
WithVersion(v)(&c)
}
c.flushOnExit = internal.BoolEnv("DD_PROFILING_FLUSH_ON_EXIT", false)

tags := make(map[string]string)
if v := os.Getenv("DD_TAGS"); v != "" {
Expand Down
18 changes: 14 additions & 4 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func (p *profiler) collect(ticker <-chan time.Time) {
endpointCounter.GetAndReset()
}()

for {
exit := false
for !exit {
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
Expand Down Expand Up @@ -384,7 +385,11 @@ func (p *profiler) collect(ticker <-chan time.Time) {
// is less than the configured profiling period, the ticker will block
// until the end of the profiling period.
case <-p.exit:
return
if !p.cfg.flushOnExit {
return
}
// If we're flushing, we enqueue the batch before exiting the loop.
exit = true
}

// Include endpoint hits from tracer in profile `event.json`.
Expand Down Expand Up @@ -457,8 +462,13 @@ func (p *profiler) send() {
for {
select {
case <-p.exit:
return
case bat := <-p.out:
if !p.cfg.flushOnExit {
return
}
case bat, ok := <-p.out:
if !ok {
return
}
if err := p.outputDir(bat); err != nil {
log.Error("Failed to output profile to dir: %v", err)
}
Expand Down
54 changes: 54 additions & 0 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/trace"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -233,6 +234,59 @@ func TestStopLatency(t *testing.T) {
}
}

func TestFlushAndStop(t *testing.T) {
t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1")
received := startTestProfiler(t, 1,
WithProfileTypes(CPUProfile, HeapProfile),
WithPeriod(time.Hour),
WithUploadTimeout(time.Hour))

Stop()

select {
case prof := <-received:
if len(prof.attachments["cpu.pprof"]) == 0 {
t.Errorf("expected CPU profile, got none")
}
if len(prof.attachments["delta-heap.pprof"]) == 0 {
t.Errorf("expected heap profile, got none")
}
case <-time.After(5 * time.Second):
t.Fatalf("profiler did not flush")
}
}

func TestFlushAndStopTimeout(t *testing.T) {
uploadTimeout := 1 * time.Second
var requests atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if h := r.Header.Get("DD-Telemetry-Request-Type"); len(h) > 0 {
return
}
requests.Add(1)
time.Sleep(2 * uploadTimeout)
}))
defer server.Close()

t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1")
Start(
WithAgentAddr(server.Listener.Addr().String()),
WithPeriod(time.Hour),
WithUploadTimeout(uploadTimeout),
)

start := time.Now()
Stop()

elapsed := time.Since(start)
if elapsed > (maxRetries*uploadTimeout)+1*time.Second {
t.Errorf("profiler took %v to stop", elapsed)
}
if requests.Load() != maxRetries {
t.Errorf("expected %d requests, got %d", maxRetries, requests.Load())
}
}

func TestSetProfileFraction(t *testing.T) {
t.Run("on", func(t *testing.T) {
start := runtime.SetMutexProfileFraction(0)
Expand Down
1 change: 1 addition & 0 deletions profiler/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func startTelemetry(c *config) {
{Name: "endpoint_count_enabled", Value: c.endpointCountEnabled},
{Name: "num_custom_profiler_label_keys", Value: len(c.customProfilerLabels)},
{Name: "enabled", Value: c.enabled},
{Name: "flush_on_exit", Value: c.flushOnExit},
},
)
}
7 changes: 6 additions & 1 deletion profiler/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (p *profiler) upload(bat batch) error {
for i := 0; i < maxRetries; i++ {
select {
case <-p.exit:
return nil
if !p.cfg.flushOnExit {
return nil
}
default:
}

Expand Down Expand Up @@ -98,6 +100,9 @@ func (p *profiler) doRequest(bat batch) error {
go func() {
select {
case <-p.exit:
if p.cfg.flushOnExit {
return
}
case <-funcExit:
}
cancel()
Expand Down

0 comments on commit 2cdd6a9

Please sign in to comment.