diff --git a/contrib/valyala/fasthttp.v1/fasthttp.go b/contrib/valyala/fasthttp.v1/fasthttp.go index 80ba87c4e1..7d803b8652 100644 --- a/contrib/valyala/fasthttp.v1/fasthttp.go +++ b/contrib/valyala/fasthttp.v1/fasthttp.go @@ -33,14 +33,14 @@ func WrapHandler(h fasthttp.RequestHandler, opts ...Option) fasthttp.RequestHand fn(cfg) } log.Debug("contrib/valyala/fasthttp.v1: Configuring Middleware: cfg: %#v", cfg) - spanOpts := []tracer.StartSpanOption{ - tracer.ServiceName(cfg.serviceName), - } return func(fctx *fasthttp.RequestCtx) { if cfg.ignoreRequest(fctx) { h(fctx) return } + spanOpts := []tracer.StartSpanOption{ + tracer.ServiceName(cfg.serviceName), + } spanOpts = append(spanOpts, defaultSpanOptions(fctx)...) fcc := &fasthttptrace.HTTPHeadersCarrier{ ReqHeader: &fctx.Request.Header, diff --git a/ddtrace/tracer/log_test.go b/ddtrace/tracer/log_test.go index 27d31eb089..64e6aa5826 100644 --- a/ddtrace/tracer/log_test.go +++ b/ddtrace/tracer/log_test.go @@ -146,7 +146,7 @@ func TestLogSamplingRules(t *testing.T) { tp := new(log.RecordLogger) tp.Ignore("appsec: ", telemetry.LogPrefix) t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service", "sample_rate": 0.234}, {"service": "other.service"}, {"service": "last.service", "sample_rate": 0.56}, {"odd": "pairs"}, {"sample_rate": 9.10}]`) - _, _, _, stop := startTestTracer(t, WithLogger(tp)) + _, _, _, stop := startTestTracer(t, WithLogger(tp), WithEnv("test")) defer stop() assert.Len(tp.Logs(), 1) @@ -159,7 +159,7 @@ func TestLogDefaultSampleRate(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) log.UseLogger(tp) t.Setenv("DD_TRACE_SAMPLE_RATE", ``) - _, _, _, stop := startTestTracer(t, WithLogger(tp)) + _, _, _, stop := startTestTracer(t, WithLogger(tp), WithEnv("test")) defer stop() assert.Len(tp.Logs(), 0) diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 8bb9462609..3a85ebece0 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -3,8 +3,6 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. -//go:generate msgp -unexported -marshal=false -o=stats_msgp.go -tests=false - package tracer import ( @@ -68,7 +66,7 @@ func newConcentrator(c *config, bucketSize int64) *concentrator { // This should never actually happen as the agent MUST have an env configured to start-up // That panic will be removed in a future release at which point we can remove this env = "unknown-env" - log.Error("No DD Env found, normally the agent MUST have one") + log.Debug("No DD Env found, normally the agent should have one") } aggKey := stats.PayloadAggregationKey{ Hostname: c.hostname, diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index 9575ac226d..bbbd3b52c9 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -24,9 +24,10 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - "github.com/DataDog/datadog-go/v5/statsd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-go/v5/statsd" ) const otelHeaderPropagationStyle = "OTEL_PROPAGATORS" @@ -2001,7 +2002,7 @@ func TestNonePropagator(t *testing.T) { t.Setenv(headerPropagationStyleInject, "none,b3") tp := new(log.RecordLogger) tp.Ignore("appsec: ", telemetry.LogPrefix) - tracer := newTracer(WithLogger(tp)) + tracer := newTracer(WithLogger(tp), WithEnv("test")) defer tracer.Stop() // reinitializing to capture log output, since propagators are parsed before logger is set tracer.config.propagator = NewPropagator(&PropagatorConfig{}) diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 2f9b66f5f3..038107843e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -712,7 +712,7 @@ func TestTracerRuntimeMetrics(t *testing.T) { t.Run("on", func(t *testing.T) { tp := new(log.RecordLogger) tp.Ignore("appsec: ", telemetry.LogPrefix) - tracer := newTracer(WithRuntimeMetrics(), WithLogger(tp), WithDebugMode(true)) + tracer := newTracer(WithRuntimeMetrics(), WithLogger(tp), WithDebugMode(true), WithEnv("test")) defer tracer.Stop() assert.Contains(t, tp.Logs()[0], "DEBUG: Runtime metrics enabled") }) @@ -721,7 +721,7 @@ func TestTracerRuntimeMetrics(t *testing.T) { t.Setenv("DD_RUNTIME_METRICS_ENABLED", "true") tp := new(log.RecordLogger) tp.Ignore("appsec: ", telemetry.LogPrefix) - tracer := newTracer(WithLogger(tp), WithDebugMode(true)) + tracer := newTracer(WithLogger(tp), WithDebugMode(true), WithEnv("test")) defer tracer.Stop() assert.Contains(t, tp.Logs()[0], "DEBUG: Runtime metrics enabled") }) diff --git a/profiler/options.go b/profiler/options.go index 994ab21ecd..df1b1ab619 100644 --- a/profiler/options.go +++ b/profiler/options.go @@ -112,6 +112,7 @@ type config struct { traceConfig executionTraceConfig endpointCountEnabled bool enabled bool + flushOnExit bool } // logStartup records the configuration to the configured logger in JSON format @@ -148,6 +149,7 @@ func logStartup(c *config) { "endpoint_count_enabled": c.endpointCountEnabled, "custom_profiler_label_keys": c.customProfilerLabels, "enabled": c.enabled, + "flush_on_exit": c.flushOnExit, } b, err := json.Marshal(info) if err != nil { @@ -242,6 +244,7 @@ func defaultConfig() (*config, error) { if v := os.Getenv("DD_VERSION"); v != "" { WithVersion(v)(&c) } + c.flushOnExit = internal.BoolEnv("DD_PROFILING_FLUSH_ON_EXIT", false) tags := make(map[string]string) if v := os.Getenv("DD_TAGS"); v != "" { diff --git a/profiler/profiler.go b/profiler/profiler.go index 9c7ae8cf17..e694527aae 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -295,7 +295,8 @@ func (p *profiler) collect(ticker <-chan time.Time) { endpointCounter.GetAndReset() }() - for { + exit := false + for !exit { bat := batch{ seq: p.seq, host: p.cfg.hostname, @@ -384,7 +385,11 @@ func (p *profiler) collect(ticker <-chan time.Time) { // is less than the configured profiling period, the ticker will block // until the end of the profiling period. case <-p.exit: - return + if !p.cfg.flushOnExit { + return + } + // If we're flushing, we enqueue the batch before exiting the loop. + exit = true } // Include endpoint hits from tracer in profile `event.json`. @@ -457,8 +462,13 @@ func (p *profiler) send() { for { select { case <-p.exit: - return - case bat := <-p.out: + if !p.cfg.flushOnExit { + return + } + case bat, ok := <-p.out: + if !ok { + return + } if err := p.outputDir(bat); err != nil { log.Error("Failed to output profile to dir: %v", err) } diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index d8b3b1e182..c073833ff3 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -22,6 +22,7 @@ import ( "runtime/trace" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -233,6 +234,59 @@ func TestStopLatency(t *testing.T) { } } +func TestFlushAndStop(t *testing.T) { + t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1") + received := startTestProfiler(t, 1, + WithProfileTypes(CPUProfile, HeapProfile), + WithPeriod(time.Hour), + WithUploadTimeout(time.Hour)) + + Stop() + + select { + case prof := <-received: + if len(prof.attachments["cpu.pprof"]) == 0 { + t.Errorf("expected CPU profile, got none") + } + if len(prof.attachments["delta-heap.pprof"]) == 0 { + t.Errorf("expected heap profile, got none") + } + case <-time.After(5 * time.Second): + t.Fatalf("profiler did not flush") + } +} + +func TestFlushAndStopTimeout(t *testing.T) { + uploadTimeout := 1 * time.Second + var requests atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if h := r.Header.Get("DD-Telemetry-Request-Type"); len(h) > 0 { + return + } + requests.Add(1) + time.Sleep(2 * uploadTimeout) + })) + defer server.Close() + + t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1") + Start( + WithAgentAddr(server.Listener.Addr().String()), + WithPeriod(time.Hour), + WithUploadTimeout(uploadTimeout), + ) + + start := time.Now() + Stop() + + elapsed := time.Since(start) + if elapsed > (maxRetries*uploadTimeout)+1*time.Second { + t.Errorf("profiler took %v to stop", elapsed) + } + if requests.Load() != maxRetries { + t.Errorf("expected %d requests, got %d", maxRetries, requests.Load()) + } +} + func TestSetProfileFraction(t *testing.T) { t.Run("on", func(t *testing.T) { start := runtime.SetMutexProfileFraction(0) diff --git a/profiler/telemetry.go b/profiler/telemetry.go index 0dc2cf9be8..a53367f33a 100644 --- a/profiler/telemetry.go +++ b/profiler/telemetry.go @@ -55,6 +55,7 @@ func startTelemetry(c *config) { {Name: "endpoint_count_enabled", Value: c.endpointCountEnabled}, {Name: "num_custom_profiler_label_keys", Value: len(c.customProfilerLabels)}, {Name: "enabled", Value: c.enabled}, + {Name: "flush_on_exit", Value: c.flushOnExit}, }, ) } diff --git a/profiler/upload.go b/profiler/upload.go index a8b98f6560..6d736fc1be 100644 --- a/profiler/upload.go +++ b/profiler/upload.go @@ -37,7 +37,9 @@ func (p *profiler) upload(bat batch) error { for i := 0; i < maxRetries; i++ { select { case <-p.exit: - return nil + if !p.cfg.flushOnExit { + return nil + } default: } @@ -98,6 +100,9 @@ func (p *profiler) doRequest(bat batch) error { go func() { select { case <-p.exit: + if p.cfg.flushOnExit { + return + } case <-funcExit: } cancel()