From 2276949691e675739578e3b4a766bdef4d31facb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 11 Dec 2024 15:05:34 +0100 Subject: [PATCH 01/11] contrib/log/slog: test with slogtest (#3018) --- contrib/log/slog/slog_test.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/contrib/log/slog/slog_test.go b/contrib/log/slog/slog_test.go index 0d25702cb2..7546009b0f 100644 --- a/contrib/log/slog/slog_test.go +++ b/contrib/log/slog/slog_test.go @@ -14,6 +14,7 @@ import ( "strconv" "strings" "testing" + "testing/slogtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -84,13 +85,33 @@ func TestNewJSONHandler(t *testing.T) { } func TestWrapHandler(t *testing.T) { - testLogger( - t, - func(w io.Writer) *slog.Logger { - return slog.New(WrapHandler(slog.NewJSONHandler(w, nil))) - }, - nil, - ) + t.Run("testLogger", func(t *testing.T) { + testLogger( + t, + func(w io.Writer) *slog.Logger { + return slog.New(WrapHandler(slog.NewJSONHandler(w, nil))) + }, + nil, + ) + }) + + t.Run("slogtest", func(t *testing.T) { + var buf bytes.Buffer + h := WrapHandler(slog.NewJSONHandler(&buf, nil)) + results := func() []map[string]any { + var ms []map[string]any + for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) { + if len(line) == 0 { + continue + } + var m map[string]any + require.NoError(t, json.Unmarshal(line, &m)) + ms = append(ms, m) + } + return ms + } + require.NoError(t, slogtest.TestHandler(h, results)) + }) } func TestHandlerWithAttrs(t *testing.T) { From 14e38da45f0231fc4f1d9da1efbcc55c57f7a201 Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Wed, 11 Dec 2024 15:15:08 +0100 Subject: [PATCH 02/11] chore: update main version to 1.72.0-dev (#3022) --- internal/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/version/version.go b/internal/version/version.go index 6918f58dce..c91fed8dcc 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -13,7 +13,7 @@ import ( // Tag specifies the current release tag. It needs to be manually // updated. A test checks that the value of Tag never points to a // git tag that is older than HEAD. -const Tag = "v1.71.0-dev" +const Tag = "v1.72.0-dev" // Dissected version number. Filled during init() var ( From b62c4a84789ea7594dbc552e2def527c088cfe20 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Thu, 12 Dec 2024 11:29:08 +0100 Subject: [PATCH 03/11] fix(ddtrace/tracer): Disable agent features and remote configuration in CI Visibility agentless mode (#3026) --- ddtrace/tracer/option.go | 26 +++++++++++++------------- ddtrace/tracer/tracer.go | 13 +++++++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 5868086037..ae2594aa50 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -532,8 +532,19 @@ func newConfig(opts ...StartOption) *config { if c.debug { log.SetLevel(log.LevelDebug) } - // if using stdout or traces are disabled, agent is disabled - agentDisabled := c.logToStdout || !c.enabled.current + + // Check if CI Visibility mode is enabled + if internal.BoolEnv(constants.CIVisibilityEnabledEnvironmentVariable, false) { + c.ciVisibilityEnabled = true // Enable CI Visibility mode + c.httpClientTimeout = time.Second * 45 // Increase timeout up to 45 seconds (same as other tracers in CIVis mode) + c.logStartup = false // If we are in CI Visibility mode we don't want to log the startup to stdout to avoid polluting the output + ciTransport := newCiVisibilityTransport(c) // Create a default CI Visibility Transport + c.transport = ciTransport // Replace the default transport with the CI Visibility transport + c.ciVisibilityAgentless = ciTransport.agentless + } + + // if using stdout or traces are disabled or we are in ci visibility agentless mode, agent is disabled + agentDisabled := c.logToStdout || !c.enabled.current || c.ciVisibilityAgentless c.agent = loadAgentFeatures(agentDisabled, c.agentURL, c.httpClient) info, ok := debug.ReadBuildInfo() if !ok { @@ -551,17 +562,6 @@ func newConfig(opts ...StartOption) *config { // This allows persisting the initial value of globalTags for future resets and updates. globalTagsOrigin := c.globalTags.cfgOrigin c.initGlobalTags(c.globalTags.get(), globalTagsOrigin) - - // Check if CI Visibility mode is enabled - if internal.BoolEnv(constants.CIVisibilityEnabledEnvironmentVariable, false) { - c.ciVisibilityEnabled = true // Enable CI Visibility mode - c.httpClientTimeout = time.Second * 45 // Increase timeout up to 45 seconds (same as other tracers in CIVis mode) - c.logStartup = false // If we are in CI Visibility mode we don't want to log the startup to stdout to avoid polluting the output - ciTransport := newCiVisibilityTransport(c) // Create a default CI Visibility Transport - c.transport = ciTransport // Replace the default transport with the CI Visibility transport - c.ciVisibilityAgentless = ciTransport.agentless - } - return c } diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index a0de522d24..e6bfcc7b49 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -165,6 +165,19 @@ func Start(opts ...StartOption) { if t.dataStreams != nil { t.dataStreams.Start() } + if t.config.ciVisibilityAgentless { + // CI Visibility agentless mode doesn't require remote configuration. + + // start instrumentation telemetry unless it is disabled through the + // DD_INSTRUMENTATION_TELEMETRY_ENABLED env var + startTelemetry(t.config) + + // start appsec + appsec.Start(t.config.appsecStartOptions...) + _ = t.hostname() // Prime the hostname cache + return + } + // Start AppSec with remote configuration cfg := remoteconfig.DefaultClientConfig() cfg.AgentURL = t.config.agentURL.String() From 7a7b85f97ca9b533ec087744e07e9a964c909f9a Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Thu, 12 Dec 2024 12:53:31 +0100 Subject: [PATCH 04/11] internal/civisibility: Add auto test retries telemetry metric (#3028) --- internal/civisibility/utils/net/settings_api.go | 3 +++ internal/civisibility/utils/telemetry/telemetry.go | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/civisibility/utils/net/settings_api.go b/internal/civisibility/utils/net/settings_api.go index aa2f88dc66..a27a37cf51 100644 --- a/internal/civisibility/utils/net/settings_api.go +++ b/internal/civisibility/utils/net/settings_api.go @@ -120,6 +120,9 @@ func (c *client) GetSettings() (*SettingsResponseData, error) { if responseObject.Data.Attributes.EarlyFlakeDetection.Enabled { settingsResponseType = append(settingsResponseType, telemetry.EfdEnabledSettingsResponseType...) } + if responseObject.Data.Attributes.FlakyTestRetriesEnabled { + settingsResponseType = append(settingsResponseType, telemetry.FlakyTestRetriesEnabledSettingsResponseType...) + } telemetry.GitRequestsSettingsResponse(settingsResponseType) return &responseObject.Data.Attributes, nil } diff --git a/internal/civisibility/utils/telemetry/telemetry.go b/internal/civisibility/utils/telemetry/telemetry.go index 3b700150a7..69d0b039c9 100644 --- a/internal/civisibility/utils/telemetry/telemetry.go +++ b/internal/civisibility/utils/telemetry/telemetry.go @@ -113,9 +113,10 @@ const ( type SettingsResponseType []string var ( - CoverageEnabledSettingsResponseType SettingsResponseType = []string{"coverage_enabled"} - ItrSkipEnabledSettingsResponseType SettingsResponseType = []string{"itrskip_enabled"} - EfdEnabledSettingsResponseType SettingsResponseType = []string{"early_flake_detection_enabled:true"} + CoverageEnabledSettingsResponseType SettingsResponseType = []string{"coverage_enabled"} + ItrSkipEnabledSettingsResponseType SettingsResponseType = []string{"itrskip_enabled"} + EfdEnabledSettingsResponseType SettingsResponseType = []string{"early_flake_detection_enabled:true"} + FlakyTestRetriesEnabledSettingsResponseType SettingsResponseType = []string{"flaky_test_retries_enabled:true"} ) // removeEmptyStrings removes empty string values inside an array or use the same if not empty string is found. From 6004a6c030021d1d5061ab87ea3679641f996885 Mon Sep 17 00:00:00 2001 From: Anatole Beuzon <8351433+anatolebeuzon@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:05:06 -0500 Subject: [PATCH 05/11] ddtrace/tracer: initialize runtimeMetricsV2 with statsd "direct" client (#3006) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Felix Geisendörfer Co-authored-by: Nayef Ghattas --- contrib/database/sql/option_test.go | 2 +- ddtrace/mocktracer/mocktracer.go | 2 +- ddtrace/tracer/option.go | 2 +- ddtrace/tracer/stats.go | 2 +- ddtrace/tracer/stats_test.go | 8 +++--- ddtrace/tracer/textmap_test.go | 34 +++++++++++++------------- ddtrace/tracer/tracer_test.go | 2 +- go.mod | 2 +- go.sum | 4 +-- internal/apps/go.mod | 2 +- internal/apps/go.sum | 4 +-- internal/datastreams/processor_test.go | 2 +- internal/exectracetest/go.mod | 2 +- internal/exectracetest/go.sum | 4 +-- internal/statsd.go | 5 ++-- internal/statsdtest/statsdtest.go | 7 ++++++ 16 files changed, 46 insertions(+), 38 deletions(-) diff --git a/contrib/database/sql/option_test.go b/contrib/database/sql/option_test.go index e04c5ef452..469bc48736 100644 --- a/contrib/database/sql/option_test.go +++ b/contrib/database/sql/option_test.go @@ -69,7 +69,7 @@ func TestCheckStatsdRequired(t *testing.T) { cfg := new(config) cfg.dbStats = true cfg.checkStatsdRequired() - _, ok := cfg.statsdClient.(*statsd.Client) + _, ok := cfg.statsdClient.(*statsd.ClientDirect) assert.True(t, ok) }) t.Run("invalid address", func(t *testing.T) { diff --git a/ddtrace/mocktracer/mocktracer.go b/ddtrace/mocktracer/mocktracer.go index 2a210e07bb..396d99bf20 100644 --- a/ddtrace/mocktracer/mocktracer.go +++ b/ddtrace/mocktracer/mocktracer.go @@ -80,7 +80,7 @@ func newMockTracer() *mocktracer { client := &http.Client{ Transport: t.dsmTransport, } - t.dsmProcessor = datastreams.NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client) + t.dsmProcessor = datastreams.NewProcessor(&statsd.NoOpClientDirect{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client) t.dsmProcessor.Start() t.dsmProcessor.Flush() return &t diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index ae2594aa50..fd42e3c4e4 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -802,7 +802,7 @@ func statsTags(c *config) []string { // withNoopStats is used for testing to disable statsd client func withNoopStats() StartOption { return func(c *config) { - c.statsdClient = &statsd.NoOpClient{} + c.statsdClient = &statsd.NoOpClientDirect{} } } diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index a3bdfc9d32..bf8530a8d6 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -132,7 +132,7 @@ func (c *concentrator) runFlusher(tick <-chan time.Time) { // statsd returns any tracer configured statsd client, or a no-op. func (c *concentrator) statsd() internal.StatsdClient { if c.statsdClient == nil { - return &statsd.NoOpClient{} + return &statsd.NoOpClientDirect{} } return c.statsdClient } diff --git a/ddtrace/tracer/stats_test.go b/ddtrace/tracer/stats_test.go index e9c56d8732..42f0bc17b0 100644 --- a/ddtrace/tracer/stats_test.go +++ b/ddtrace/tracer/stats_test.go @@ -41,7 +41,7 @@ func TestConcentrator(t *testing.T) { } t.Run("start-stop", func(t *testing.T) { assert := assert.New(t) - c := newConcentrator(&config{}, bucketSize, &statsd.NoOpClient{}) + c := newConcentrator(&config{}, bucketSize, &statsd.NoOpClientDirect{}) assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) c.Start() assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) @@ -60,7 +60,7 @@ func TestConcentrator(t *testing.T) { t.Run("flusher", func(t *testing.T) { t.Run("old", func(t *testing.T) { transport := newDummyTransport() - c := newConcentrator(&config{transport: transport, env: "someEnv"}, 500_000, &statsd.NoOpClient{}) + c := newConcentrator(&config{transport: transport, env: "someEnv"}, 500_000, &statsd.NoOpClientDirect{}) assert.Len(t, transport.Stats(), 0) ss1, ok := c.newTracerStatSpan(&s1, nil) assert.True(t, ok) @@ -105,7 +105,7 @@ func TestConcentrator(t *testing.T) { t.Run("ciGitSha", func(t *testing.T) { utils.AddCITags(constants.GitCommitSHA, "DEADBEEF") transport := newDummyTransport() - c := newConcentrator(&config{transport: transport, env: "someEnv"}, (10 * time.Second).Nanoseconds(), &statsd.NoOpClient{}) + c := newConcentrator(&config{transport: transport, env: "someEnv"}, (10 * time.Second).Nanoseconds(), &statsd.NoOpClientDirect{}) assert.Len(t, transport.Stats(), 0) ss1, ok := c.newTracerStatSpan(&s1, nil) assert.True(t, ok) @@ -119,7 +119,7 @@ func TestConcentrator(t *testing.T) { // stats should be sent if the concentrator is stopped t.Run("stop", func(t *testing.T) { transport := newDummyTransport() - c := newConcentrator(&config{transport: transport}, 500000, &statsd.NoOpClient{}) + c := newConcentrator(&config{transport: transport}, 500000, &statsd.NoOpClientDirect{}) assert.Len(t, transport.Stats(), 0) ss1, ok := c.newTracerStatSpan(&s1, nil) assert.True(t, ok) diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index 35a84523aa..74038fc0c7 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -671,7 +671,7 @@ func TestEnvVars(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("inject with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) ctx, ok := root.Context().(*spanContext) @@ -743,7 +743,7 @@ func TestEnvVars(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(test.in) @@ -782,7 +782,7 @@ func TestEnvVars(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) _, err := tracer.Extract(tc.in) @@ -839,7 +839,7 @@ func TestEnvVars(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.in) @@ -876,7 +876,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("b3 single header inject #%d", i), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() root := tracer.StartSpan("myrequest").(*span) ctx, ok := root.Context().(*spanContext) @@ -934,7 +934,7 @@ func TestEnvVars(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("inject with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithPropagator(NewPropagator(&PropagatorConfig{B3: true})), WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithPropagator(NewPropagator(&PropagatorConfig{B3: true})), WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) ctx, ok := root.Context().(*spanContext) @@ -1009,7 +1009,7 @@ func TestEnvVars(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) @@ -1069,7 +1069,7 @@ func TestEnvVars(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("inject and extract with env=%q", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) @@ -1281,7 +1281,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%v extract/valid with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.in) @@ -1337,7 +1337,7 @@ func TestEnvVars(t *testing.T) { for i, tc := range tests { t.Run(fmt.Sprintf("#%v extract/invalid with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc) @@ -1389,7 +1389,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%v extract/valid with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.inHeaders) @@ -1591,7 +1591,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%d w3c inject with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) root := tracer.StartSpan("web.request").(*span) @@ -1621,7 +1621,7 @@ func TestEnvVars(t *testing.T) { }) t.Run(fmt.Sprintf("w3c inject with env=%q / testing tag list-member limit", testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) root := tracer.StartSpan("web.request").(*span) @@ -1689,7 +1689,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.inHeaders) @@ -1764,7 +1764,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%d w3c inject/extract with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.in) @@ -1828,7 +1828,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%d w3c inject/extract with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) pCtx, err := tracer.Extract(tc.in) @@ -1911,7 +1911,7 @@ func TestEnvVars(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("#%v extract with env=%q", i, testEnv), func(t *testing.T) { - tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClient{})) + tracer := newTracer(WithHTTPClient(c), withStatsdClient(&statsd.NoOpClientDirect{})) defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(tc.in) diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 555c71080d..dd1820621e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -2347,7 +2347,7 @@ func TestFlush(t *testing.T) { tr.statsd = ts transport := newDummyTransport() - c := newConcentrator(&config{transport: transport, env: "someEnv"}, defaultStatsBucketSize, &statsd.NoOpClient{}) + c := newConcentrator(&config{transport: transport, env: "someEnv"}, defaultStatsBucketSize, &statsd.NoOpClientDirect{}) tr.stats = c c.Start() defer c.Stop() diff --git a/go.mod b/go.mod index 6696dcc4cc..f3a841b96d 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/DataDog/datadog-agent/pkg/trace v0.58.0 github.com/DataDog/datadog-go/v5 v5.5.0 github.com/DataDog/go-libddwaf/v3 v3.5.1 - github.com/DataDog/go-runtime-metrics-internal v0.0.3 + github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 github.com/DataDog/gostackparse v0.7.0 github.com/DataDog/sketches-go v1.4.5 github.com/IBM/sarama v1.40.0 diff --git a/go.sum b/go.sum index b05429f344..6471b96ff5 100644 --- a/go.sum +++ b/go.sum @@ -644,8 +644,8 @@ github.com/DataDog/datadog-go/v5 v5.5.0 h1:G5KHeB8pWBNXT4Jtw0zAkhdxEAWSpWH00geHI github.com/DataDog/datadog-go/v5 v5.5.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= github.com/DataDog/go-libddwaf/v3 v3.5.1 h1:GWA4ln4DlLxiXm+X7HA/oj0ZLcdCwOS81KQitegRTyY= github.com/DataDog/go-libddwaf/v3 v3.5.1/go.mod h1:n98d9nZ1gzenRSk53wz8l6d34ikxS+hs62A31Fqmyi4= -github.com/DataDog/go-runtime-metrics-internal v0.0.3 h1:AqzLCS4rqojBoCNwEAmbpLFAkKT43e+ze/a6aFbfDpU= -github.com/DataDog/go-runtime-metrics-internal v0.0.3/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 h1:bpitH5JbjBhfcTG+H2RkkiUXpYa8xSuIPnyNtTaSPog= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= github.com/DataDog/go-sqllexer v0.0.14 h1:xUQh2tLr/95LGxDzLmttLgTo/1gzFeOyuwrQa/Iig4Q= github.com/DataDog/go-sqllexer v0.0.14/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc= github.com/DataDog/go-tuf v1.1.0-0.5.2 h1:4CagiIekonLSfL8GMHRHcHudo1fQnxELS9g4tiAupQ4= diff --git a/internal/apps/go.mod b/internal/apps/go.mod index 9ce5485a85..d90e69002e 100644 --- a/internal/apps/go.mod +++ b/internal/apps/go.mod @@ -14,7 +14,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/log v0.58.0 // indirect github.com/DataDog/datadog-agent/pkg/util/scrubber v0.58.0 // indirect github.com/DataDog/go-libddwaf/v3 v3.5.1 // indirect - github.com/DataDog/go-runtime-metrics-internal v0.0.3 // indirect + github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 // indirect github.com/DataDog/go-sqllexer v0.0.14 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.20.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect diff --git a/internal/apps/go.sum b/internal/apps/go.sum index a591990f7b..1ca195d002 100644 --- a/internal/apps/go.sum +++ b/internal/apps/go.sum @@ -16,8 +16,8 @@ github.com/DataDog/datadog-go/v5 v5.5.0 h1:G5KHeB8pWBNXT4Jtw0zAkhdxEAWSpWH00geHI github.com/DataDog/datadog-go/v5 v5.5.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= github.com/DataDog/go-libddwaf/v3 v3.5.1 h1:GWA4ln4DlLxiXm+X7HA/oj0ZLcdCwOS81KQitegRTyY= github.com/DataDog/go-libddwaf/v3 v3.5.1/go.mod h1:n98d9nZ1gzenRSk53wz8l6d34ikxS+hs62A31Fqmyi4= -github.com/DataDog/go-runtime-metrics-internal v0.0.3 h1:AqzLCS4rqojBoCNwEAmbpLFAkKT43e+ze/a6aFbfDpU= -github.com/DataDog/go-runtime-metrics-internal v0.0.3/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 h1:bpitH5JbjBhfcTG+H2RkkiUXpYa8xSuIPnyNtTaSPog= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= github.com/DataDog/go-sqllexer v0.0.14 h1:xUQh2tLr/95LGxDzLmttLgTo/1gzFeOyuwrQa/Iig4Q= github.com/DataDog/go-sqllexer v0.0.14/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc= github.com/DataDog/go-tuf v1.1.0-0.5.2 h1:4CagiIekonLSfL8GMHRHcHudo1fQnxELS9g4tiAupQ4= diff --git a/internal/datastreams/processor_test.go b/internal/datastreams/processor_test.go index 1e0418a2d8..d60a01c4ef 100644 --- a/internal/datastreams/processor_test.go +++ b/internal/datastreams/processor_test.go @@ -264,7 +264,7 @@ func BenchmarkSetCheckpoint(b *testing.B) { client := &http.Client{ Transport: &noOpTransport{}, } - p := NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client) + p := NewProcessor(&statsd.NoOpClientDirect{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client) p.Start() for i := 0; i < b.N; i++ { p.SetCheckpointWithParams(context.Background(), options.CheckpointParams{PayloadSize: 1000}, "type:edge-1", "direction:in", "type:kafka", "topic:topic1", "group:group1") diff --git a/internal/exectracetest/go.mod b/internal/exectracetest/go.mod index 0bc1db3393..33a74a94ba 100644 --- a/internal/exectracetest/go.mod +++ b/internal/exectracetest/go.mod @@ -19,7 +19,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/scrubber v0.58.0 // indirect github.com/DataDog/datadog-go/v5 v5.5.0 // indirect github.com/DataDog/go-libddwaf/v3 v3.5.1 // indirect - github.com/DataDog/go-runtime-metrics-internal v0.0.3 // indirect + github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 // indirect github.com/DataDog/go-sqllexer v0.0.14 // indirect github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.20.0 // indirect diff --git a/internal/exectracetest/go.sum b/internal/exectracetest/go.sum index 0d8bd345c4..7321d5519b 100644 --- a/internal/exectracetest/go.sum +++ b/internal/exectracetest/go.sum @@ -16,8 +16,8 @@ github.com/DataDog/datadog-go/v5 v5.5.0 h1:G5KHeB8pWBNXT4Jtw0zAkhdxEAWSpWH00geHI github.com/DataDog/datadog-go/v5 v5.5.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= github.com/DataDog/go-libddwaf/v3 v3.5.1 h1:GWA4ln4DlLxiXm+X7HA/oj0ZLcdCwOS81KQitegRTyY= github.com/DataDog/go-libddwaf/v3 v3.5.1/go.mod h1:n98d9nZ1gzenRSk53wz8l6d34ikxS+hs62A31Fqmyi4= -github.com/DataDog/go-runtime-metrics-internal v0.0.3 h1:AqzLCS4rqojBoCNwEAmbpLFAkKT43e+ze/a6aFbfDpU= -github.com/DataDog/go-runtime-metrics-internal v0.0.3/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6 h1:bpitH5JbjBhfcTG+H2RkkiUXpYa8xSuIPnyNtTaSPog= +github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20241206090539-a14610dc22b6/go.mod h1:quaQJ+wPN41xEC458FCpTwyROZm3MzmTZ8q8XOXQiPs= github.com/DataDog/go-sqllexer v0.0.14 h1:xUQh2tLr/95LGxDzLmttLgTo/1gzFeOyuwrQa/Iig4Q= github.com/DataDog/go-sqllexer v0.0.14/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc= github.com/DataDog/go-tuf v1.1.0-0.5.2 h1:4CagiIekonLSfL8GMHRHcHudo1fQnxELS9g4tiAupQ4= diff --git a/internal/statsd.go b/internal/statsd.go index df1d18a307..60f3d4431c 100644 --- a/internal/statsd.go +++ b/internal/statsd.go @@ -19,6 +19,7 @@ type StatsdClient interface { CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time) error Gauge(name string, value float64, tags []string, rate float64) error GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time) error + DistributionSamples(name string, values []float64, tags []string, rate float64) error Timing(name string, value time.Duration, tags []string, rate float64) error Flush() error Close() error @@ -29,9 +30,9 @@ func NewStatsdClient(addr string, globalTags []string) (StatsdClient, error) { if addr == "" { addr = DefaultDogstatsdAddr } - client, err := statsd.New(addr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(globalTags)) + client, err := statsd.NewDirect(addr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(globalTags)) if err != nil { - return &statsd.NoOpClient{}, err + return &statsd.NoOpClientDirect{}, err } return client, nil } diff --git a/internal/statsdtest/statsdtest.go b/internal/statsdtest/statsdtest.go index 8845e6465c..ffcd700cd1 100644 --- a/internal/statsdtest/statsdtest.go +++ b/internal/statsdtest/statsdtest.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "gopkg.in/DataDog/dd-trace-go.v1/internal" ) type callType int64 @@ -24,6 +25,8 @@ const ( callTypeTiming ) +var _ internal.StatsdClient = &TestStatsdClient{} + type TestStatsdClient struct { mu sync.RWMutex gaugeCalls []TestStatsdCall @@ -104,6 +107,10 @@ func (tg *TestStatsdClient) CountWithTimestamp(name string, value int64, tags [] }) } +func (tg *TestStatsdClient) DistributionSamples(_ string, _ []float64, _ []string, _ float64) error { + panic("not implemented") +} + func (tg *TestStatsdClient) Timing(name string, value time.Duration, tags []string, rate float64) error { return tg.addMetric(callTypeTiming, tags, TestStatsdCall{ name: name, From a8665eb0b88c50b6e30477fd464f1e7e985d6a51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dario=20Casta=C3=B1=C3=A9?= Date: Mon, 16 Dec 2024 10:00:05 +0100 Subject: [PATCH 06/11] fix(.github/workflows): add tags-ignore to avoid running CI on pushing tags for contribs and other nested modules (#3005) Co-authored-by: Hannah Kim --- .github/workflows/appsec.yml | 3 +++ .github/workflows/codeql-analysis.yml | 3 +++ .github/workflows/govulncheck.yml | 5 ++++- .github/workflows/main-branch-tests.yml | 5 +++-- .github/workflows/orchestrion.yml | 3 +++ .github/workflows/parametric-tests.yml | 5 +++-- .github/workflows/pull-request.yml | 3 +++ .github/workflows/smoke-tests.yml | 5 +++-- .github/workflows/system-tests.yml | 5 +++-- 9 files changed, 28 insertions(+), 9 deletions(-) diff --git a/.github/workflows/appsec.yml b/.github/workflows/appsec.yml index 80c1fce6cf..d238b55ba3 100644 --- a/.github/workflows/appsec.yml +++ b/.github/workflows/appsec.yml @@ -19,6 +19,9 @@ on: merge_group: push: branches: release-v* + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' env: DD_APPSEC_WAF_TIMEOUT: 1m diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 8d85bac9df..53fb2d4cc2 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -9,6 +9,9 @@ on: type: string push: branches: [ main, master ] + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' pull_request: # The branches below must be a subset of the branches above branches: [ main ] diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml index eaf5ed78d9..e7549a71fc 100644 --- a/.github/workflows/govulncheck.yml +++ b/.github/workflows/govulncheck.yml @@ -9,7 +9,10 @@ on: push: branches: - main - - release-v* + - release-v* + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' schedule: - cron: '00 00 * * *' workflow_dispatch: diff --git a/.github/workflows/main-branch-tests.yml b/.github/workflows/main-branch-tests.yml index 7d738ccd9a..136805cebd 100644 --- a/.github/workflows/main-branch-tests.yml +++ b/.github/workflows/main-branch-tests.yml @@ -11,8 +11,9 @@ on: branches: - main - release-v* - tags: - - "**" + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' concurrency: group: ${{ github.ref }} diff --git a/.github/workflows/orchestrion.yml b/.github/workflows/orchestrion.yml index 0603c8b139..ddd0e7435b 100644 --- a/.github/workflows/orchestrion.yml +++ b/.github/workflows/orchestrion.yml @@ -6,6 +6,9 @@ on: push: branches: - release-v* + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' permissions: read-all diff --git a/.github/workflows/parametric-tests.yml b/.github/workflows/parametric-tests.yml index a25c01a2c3..4027451800 100644 --- a/.github/workflows/parametric-tests.yml +++ b/.github/workflows/parametric-tests.yml @@ -11,8 +11,9 @@ on: branches: - main - release-v* - tags: - - "**" + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' pull_request: branches: - "**" diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 9513b2e328..8318181820 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -8,6 +8,9 @@ on: push: branches: - 'mq-working-branch-**' + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' concurrency: group: ${{ github.ref }} diff --git a/.github/workflows/smoke-tests.yml b/.github/workflows/smoke-tests.yml index cee00b0d34..9a05bdc8ba 100644 --- a/.github/workflows/smoke-tests.yml +++ b/.github/workflows/smoke-tests.yml @@ -15,8 +15,9 @@ on: branches: - main - release-v* - tags: - - '**' + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' schedule: # nightly - cron: "0 0 * * *" workflow_dispatch: {} # manually diff --git a/.github/workflows/system-tests.yml b/.github/workflows/system-tests.yml index 90933ec1ad..f744ce1303 100644 --- a/.github/workflows/system-tests.yml +++ b/.github/workflows/system-tests.yml @@ -11,8 +11,9 @@ on: branches: - main - release-v* - tags: - - "**" + tags-ignore: + - 'contrib/**' + - 'instrumentation/**' pull_request: branches: - "**" From c0d5d5a9756443c34579c20b6116fc2e625e8152 Mon Sep 17 00:00:00 2001 From: Flavien Darche <11708575+e-n-0@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:57:02 +0000 Subject: [PATCH 07/11] contrib/envoyproxy: envoy external processing support (#2895) This PR adds a new gRPC Interceptor (StreamServerInterceptor) to support the interception of ext_proc v3 calls to gRPC server. When the interceptor is applied, all messages of the external processing protocol are instrumented without returning an handle to the original server code Co-authored-by: Eliott Bouhana Co-authored-by: Flavien Darche --- contrib/envoyproxy/go-control-plane/envoy.go | 350 +++++++++++ .../envoyproxy/go-control-plane/envoy_test.go | 576 ++++++++++++++++++ .../go-control-plane/example_test.go | 44 ++ .../envoyproxy/go-control-plane/fakehttp.go | 189 ++++++ contrib/internal/httptrace/response_writer.go | 7 + go.mod | 3 + go.sum | 6 + internal/appsec/testdata/user_rules.json | 27 + 8 files changed, 1202 insertions(+) create mode 100644 contrib/envoyproxy/go-control-plane/envoy.go create mode 100644 contrib/envoyproxy/go-control-plane/envoy_test.go create mode 100644 contrib/envoyproxy/go-control-plane/example_test.go create mode 100644 contrib/envoyproxy/go-control-plane/fakehttp.go diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go new file mode 100644 index 0000000000..52279e0138 --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -0,0 +1,350 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "errors" + "io" + "math" + "net/http" + "strings" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/actions" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + envoycore "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoyextproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3" +) + +const componentName = "envoyproxy/go-control-plane" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane") +} + +// appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface. +type appsecEnvoyExternalProcessorServer struct { + envoyextproc.ExternalProcessorServer +} + +// AppsecEnvoyExternalProcessorServer creates and returns a new instance of appsecEnvoyExternalProcessorServer. +func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer) envoyextproc.ExternalProcessorServer { + return &appsecEnvoyExternalProcessorServer{userImplementation} +} + +type currentRequest struct { + span tracer.Span + afterHandle func() + ctx context.Context + fakeResponseWriter *fakeResponseWriter + wrappedResponseWriter http.ResponseWriter +} + +// Process handles the bidirectional stream that Envoy uses to give the server control +// over what the filter does. It processes incoming requests and sends appropriate responses +// based on the type of request received. +// +// The method receive incoming requests, processes them, and sends responses back to the client. +// It handles different types of requests such as request headers, response headers, request body, +// response body, request trailers, and response trailers. +// +// If the request is blocked, it sends an immediate response and ends the stream. If an error occurs +// during processing, it logs the error and returns an appropriate gRPC status error. +func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.ExternalProcessor_ProcessServer) error { + var ( + ctx = processServer.Context() + blocked bool + currentRequest *currentRequest + processingRequest envoyextproc.ProcessingRequest + processingResponse *envoyextproc.ProcessingResponse + ) + + // Close the span when the request is done processing + defer func() { + if currentRequest == nil { + return + } + + log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n") + currentRequest.span.Finish() + currentRequest = nil + }() + + for { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + + return ctx.Err() + default: + // no op + } + + err := processServer.RecvMsg(&processingRequest) + if err != nil { + // Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses, + // so we can't fully rely on it to determine when it will close (cancel) the stream. + if s, ok := status.FromError(err); (ok && s.Code() == codes.Canceled) || err == io.EOF { + return nil + } + + log.Warn("external_processing: error receiving request/response: %v\n", err) + return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err) + } + + processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest) + if err != nil { + log.Error("external_processing: error asserting request type: %v\n", err) + return status.Errorf(codes.Unknown, "Error asserting request type: %v", err) + } + + switch v := processingRequest.Request.(type) { + case *envoyextproc.ProcessingRequest_RequestHeaders: + processingResponse, currentRequest, blocked, err = processRequestHeaders(ctx, v) + case *envoyextproc.ProcessingRequest_ResponseHeaders: + processingResponse, err = processResponseHeaders(v, currentRequest) + currentRequest = nil // Request is done, reset the current request + } + + if err != nil { + log.Error("external_processing: error processing request: %v\n", err) + return err + } + + // End of stream reached, no more data to process + if processingResponse == nil { + log.Debug("external_processing: end of stream reached") + return nil + } + + if err := processServer.SendMsg(processingResponse); err != nil { + log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err) + return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err) + } + + if !blocked { + continue + } + + log.Debug("external_processing: request blocked, end the stream") + currentRequest = nil + return nil + } +} + +func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingRequest) (*envoyextproc.ProcessingResponse, error) { + switch r := req.Request.(type) { + case *envoyextproc.ProcessingRequest_RequestHeaders, *envoyextproc.ProcessingRequest_ResponseHeaders: + return nil, nil + + case *envoyextproc.ProcessingRequest_RequestBody: + // TODO: Handle request raw body in the WAF + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestBody{ + RequestBody: &envoyextproc.BodyResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + }, + }, + }, + }, nil + + case *envoyextproc.ProcessingRequest_RequestTrailers: + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestTrailers{}, + }, nil + + case *envoyextproc.ProcessingRequest_ResponseBody: + // Note: The end of stream bool value is not reliable + // Sometimes it's not set to true even if there is no more data to process + if r.ResponseBody.GetEndOfStream() { + return nil, nil + } + + // TODO: Handle response raw body in the WAF + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ResponseBody{}, + }, nil + + case *envoyextproc.ProcessingRequest_ResponseTrailers: + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestTrailers{}, + }, nil + + default: + return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", r) + } +} + +func processRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { + log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders) + + request, err := newRequest(ctx, req) + if err != nil { + return nil, nil, false, status.Errorf(codes.InvalidArgument, "Error processing request headers from ext_proc: %v", err) + } + + var blocked bool + fakeResponseWriter := newFakeResponseWriter() + wrappedResponseWriter, request, afterHandle, blocked := httptrace.BeforeHandle(&httptrace.ServeConfig{ + SpanOpts: []ddtrace.StartSpanOption{ + tracer.Tag(ext.SpanKind, ext.SpanKindServer), + tracer.Tag(ext.Component, componentName), + }, + }, fakeResponseWriter, request) + + // Block handling: If triggered, we need to block the request, return an immediate response + if blocked { + afterHandle() + return doBlockResponse(fakeResponseWriter), nil, true, nil + } + + span, ok := tracer.SpanFromContext(request.Context()) + if !ok { + return nil, nil, false, status.Errorf(codes.Unknown, "Error getting span from context") + } + + processingResponse, err := propagationRequestHeaderMutation(span) + if err != nil { + return nil, nil, false, err + } + + return processingResponse, ¤tRequest{ + span: span, + ctx: request.Context(), + fakeResponseWriter: fakeResponseWriter, + wrappedResponseWriter: wrappedResponseWriter, + afterHandle: afterHandle, + }, false, nil +} + +func propagationRequestHeaderMutation(span ddtrace.Span) (*envoyextproc.ProcessingResponse, error) { + newHeaders := make(http.Header) + if err := tracer.Inject(span.Context(), tracer.HTTPHeadersCarrier(newHeaders)); err != nil { + return nil, status.Errorf(codes.Unknown, "Error injecting headers: %v", err) + } + + if len(newHeaders) > 0 { + log.Debug("external_processing: injecting propagation headers: %v\n", newHeaders) + } + + headerValueOptions := make([]*envoycore.HeaderValueOption, 0, len(newHeaders)) + for k, v := range newHeaders { + headerValueOptions = append(headerValueOptions, &envoycore.HeaderValueOption{ + Header: &envoycore.HeaderValue{ + Key: k, + RawValue: []byte(strings.Join(v, ",")), + }, + }) + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestHeaders{ + RequestHeaders: &envoyextproc.HeadersResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + HeaderMutation: &envoyextproc.HeaderMutation{ + SetHeaders: headerValueOptions, + }, + }, + }, + }, + }, nil +} + +func processResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *currentRequest) (*envoyextproc.ProcessingResponse, error) { + log.Debug("external_processing: received response headers: %v\n", res.ResponseHeaders) + + if err := createFakeResponseWriter(currentRequest.wrappedResponseWriter, res); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: %v", err) + } + + var blocked bool + + // Now we need to know if the request has been blocked, but we don't have any other way than to look for the operation and bind a blocking data listener to it + op, ok := dyngo.FromContext(currentRequest.ctx) + if ok { + dyngo.OnData(op, func(_ *actions.BlockHTTP) { + // We already wrote over the response writer, we need to reset it so the blocking handler can write to it + httptrace.ResetStatusCode(currentRequest.wrappedResponseWriter) + currentRequest.fakeResponseWriter.Reset() + blocked = true + }) + } + + currentRequest.afterHandle() + + if blocked { + response := doBlockResponse(currentRequest.fakeResponseWriter) + return response, nil + } + + log.Debug("external_processing: finishing request with status code: %v\n", currentRequest.fakeResponseWriter.status) + + // Note: (cf. comment in the stream error handling) + // The end of stream bool value is not reliable + if res.ResponseHeaders.GetEndOfStream() { + return nil, nil + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &envoyextproc.HeadersResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + }, + }, + }, + }, nil +} + +func doBlockResponse(writer *fakeResponseWriter) *envoyextproc.ProcessingResponse { + var headersMutation []*envoycore.HeaderValueOption + for k, v := range writer.headers { + headersMutation = append(headersMutation, &envoycore.HeaderValueOption{ + Header: &envoycore.HeaderValue{ + Key: k, + RawValue: []byte(strings.Join(v, ",")), + }, + }) + } + + var int32StatusCode int32 = 0 + if writer.status > 0 && writer.status <= math.MaxInt32 { + int32StatusCode = int32(writer.status) + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &envoyextproc.ImmediateResponse{ + Status: &envoytypes.HttpStatus{ + Code: envoytypes.StatusCode(int32StatusCode), + }, + Headers: &envoyextproc.HeaderMutation{ + SetHeaders: headersMutation, + }, + Body: string(writer.body), + GrpcStatus: &envoyextproc.GrpcStatus{ + Status: 0, + }, + }, + }, + } +} diff --git a/contrib/envoyproxy/go-control-plane/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go new file mode 100644 index 0000000000..8af05eaab3 --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/envoy_test.go @@ -0,0 +1,576 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "testing" + + envoyextproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3" + + ddgrpc "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" + + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestAppSec(t *testing.T) { + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("monitoring-event-on-request", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "GET", map[string]string{"User-Agent": "dd-test-scanner-log"}, map[string]string{}, false) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"ua0-600-55x": 1}) + }) + + t.Run("blocking-event-on-request", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "dd-test-scanner-log-block"}, "GET", "/"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(403), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"ua0-600-56x": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func TestBlockingWithUserRulesFile(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/user_rules.json") + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("blocking-event-on-response", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "OPTION", map[string]string{"User-Agent": "dd-test-scanner-log-block"}, map[string]string{"User-Agent": "match-response-headers"}, true) + + // Handle the immediate response + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) // 418 because of the rule file + require.Len(t, res.GetImmediateResponse().GetHeaders().SetHeaders, 1) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"headers-003": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) + + t.Run("blocking-event-on-request-on-query", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "Mistake Not..."}, "GET", "/hello?match=match-request-query"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"query-002": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) + + t.Run("blocking-event-on-request-on-cookies", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"Cookie": "foo=jdfoSDGFkivRG_234"}, "OPTIONS", "/"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"tst-037-008": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func TestGeneratedSpan(t *testing.T) { + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("request-span", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/resource-span", "GET", map[string]string{"user-agent": "Mistake Not...", "test-key": "test-value"}, map[string]string{"response-test-key": "response-test-value"}, false) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check for tags + span := finished[0] + require.Equal(t, "http.request", span.OperationName()) + require.Equal(t, "https://datadoghq.com/resource-span", span.Tag("http.url")) + require.Equal(t, "GET", span.Tag("http.method")) + require.Equal(t, "datadoghq.com", span.Tag("http.host")) + // require.Equal(t, "GET /resource-span", span.Tag("resource.name")) + require.Equal(t, "server", span.Tag("span.kind")) + require.Equal(t, "Mistake Not...", span.Tag("http.useragent")) + }) +} + +func TestXForwardedForHeaderClientIp(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/blocking.json") + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("client-ip", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "OPTION", + map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "18.18.18.18"}, + map[string]string{"User-Agent": "match-response-headers"}, + true) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check for tags + span := finished[0] + require.Equal(t, "18.18.18.18", span.Tag("http.client_ip")) + + // Appsec + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + }) + + t.Run("blocking-client-ip", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "1.2.3.4"}, "GET", "/"), + }, + }, + }) + require.NoError(t, err) + + // Handle the immediate response + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(403), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"blk-001-001": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, "1.2.3.4", span.Tag("http.client_ip")) + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func newEnvoyAppsecRig(t *testing.T, traceClient bool, interceptorOpts ...ddgrpc.Option) (*envoyAppsecRig, error) { + t.Helper() + + interceptorOpts = append([]ddgrpc.InterceptorOption{ddgrpc.WithServiceName("grpc")}, interceptorOpts...) + + server := grpc.NewServer() + + fixtureServer := new(envoyFixtureServer) + appsecSrv := AppsecEnvoyExternalProcessorServer(fixtureServer) + envoyextproc.RegisterExternalProcessorServer(server, appsecSrv) + + li, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + _, port, _ := net.SplitHostPort(li.Addr().String()) + // start our test fixtureServer. + go server.Serve(li) + + opts := []grpc.DialOption{grpc.WithInsecure()} + if traceClient { + opts = append(opts, + grpc.WithStreamInterceptor(ddgrpc.StreamClientInterceptor(interceptorOpts...)), + ) + } + conn, err := grpc.Dial(li.Addr().String(), opts...) + if err != nil { + return nil, fmt.Errorf("error dialing: %s", err) + } + return &envoyAppsecRig{ + fixtureServer: fixtureServer, + listener: li, + port: port, + server: server, + conn: conn, + client: envoyextproc.NewExternalProcessorClient(conn), + }, err +} + +// rig contains all servers and connections we'd need for a grpc integration test +type envoyAppsecRig struct { + fixtureServer *envoyFixtureServer + server *grpc.Server + port string + listener net.Listener + conn *grpc.ClientConn + client envoyextproc.ExternalProcessorClient +} + +func (r *envoyAppsecRig) Close() { + r.server.Stop() + r.conn.Close() +} + +type envoyFixtureServer struct { + envoyextproc.ExternalProcessorServer +} + +// Helper functions + +func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) { + t.Helper() + + // First part: request + // 1- Send the headers + err := stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, requestHeaders, method, path), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus()) + + // 2- Send the body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestBody{ + RequestBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus()) + + // 3- Send the trailers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestTrailers{ + RequestTrailers: &envoyextproc.HttpTrailers{ + Trailers: &v3.HeaderMap{ + Headers: []*v3.HeaderValue{ + {Key: "key", Value: "value"}, + }, + }, + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.NotNil(t, res.GetRequestTrailers()) + + // Second part: response + // 1- Send the response headers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseHeaders{ + ResponseHeaders: &envoyextproc.HttpHeaders{ + Headers: makeResponseHeaders(t, responseHeaders, "200"), + }, + }, + }) + require.NoError(t, err) + + if blockOnResponse { + // Should have received an immediate response for blocking + // Let the test handle the response + return + } + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus()) + + // 2- Send the response body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseBody{ + ResponseBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + EndOfStream: true, + }, + }, + }) + require.NoError(t, err) + + // The stream should now be closed + _, err = stream.Recv() + require.Equal(t, io.EOF, err) +} + +func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) { + t.Helper() + + // The request should have the attack attempts + event := finished[len(finished)-1].Tag("_dd.appsec.json") + require.NotNil(t, event, "the _dd.appsec.json tag was not found") + + jsonText := event.(string) + type trigger struct { + Rule struct { + ID string `json:"id"` + } `json:"rule"` + } + var parsed struct { + Triggers []trigger `json:"triggers"` + } + err := json.Unmarshal([]byte(jsonText), &parsed) + require.NoError(t, err) + + histogram := map[string]uint8{} + for _, tr := range parsed.Triggers { + histogram[tr.Rule.ID]++ + } + + for ruleID, count := range expectedRuleIDs { + require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d") + } + + require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered") +} + +// Construct request headers +func makeRequestHeaders(t *testing.T, headers map[string]string, method string, path string) *v3.HeaderMap { + t.Helper() + + h := &v3.HeaderMap{} + for k, v := range headers { + h.Headers = append(h.Headers, &v3.HeaderValue{Key: k, RawValue: []byte(v)}) + } + + h.Headers = append(h.Headers, + &v3.HeaderValue{Key: ":method", RawValue: []byte(method)}, + &v3.HeaderValue{Key: ":path", RawValue: []byte(path)}, + &v3.HeaderValue{Key: ":scheme", RawValue: []byte("https")}, + &v3.HeaderValue{Key: ":authority", RawValue: []byte("datadoghq.com")}, + ) + + return h +} + +func makeResponseHeaders(t *testing.T, headers map[string]string, status string) *v3.HeaderMap { + t.Helper() + + h := &v3.HeaderMap{} + for k, v := range headers { + h.Headers = append(h.Headers, &v3.HeaderValue{Key: k, RawValue: []byte(v)}) + } + + h.Headers = append(h.Headers, &v3.HeaderValue{Key: ":status", RawValue: []byte(status)}) + + return h +} diff --git a/contrib/envoyproxy/go-control-plane/example_test.go b/contrib/envoyproxy/go-control-plane/example_test.go new file mode 100644 index 0000000000..f1e255dcaf --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/example_test.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane_test + +import ( + "log" + "net" + + "google.golang.org/grpc" + + extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane" +) + +// interface fpr external processing server +type envoyExtProcServer struct { + extprocv3.ExternalProcessorServer +} + +func Example_server() { + // Create a listener for the server. + ln, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatal(err) + } + + // Initialize the grpc server as normal, using the envoy server interceptor. + s := grpc.NewServer() + srv := &envoyExtProcServer{} + + // Register the appsec envoy external processor service + appsecSrv := gocontrolplane.AppsecEnvoyExternalProcessorServer(srv) + extprocv3.RegisterExternalProcessorServer(s, appsecSrv) + + // ... register your services + + // Start serving incoming connections. + if err := s.Serve(ln); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/contrib/envoyproxy/go-control-plane/fakehttp.go b/contrib/envoyproxy/go-control-plane/fakehttp.go new file mode 100644 index 0000000000..3f20725e1b --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/fakehttp.go @@ -0,0 +1,189 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "google.golang.org/grpc/metadata" +) + +// checkPseudoRequestHeaders Verify the required HTTP2 headers are present +// Some mandatory headers need to be set. It can happen when it wasn't a real HTTP2 request sent by Envoy, +func checkPseudoRequestHeaders(headers map[string]string) error { + for _, header := range []string{":authority", ":scheme", ":path", ":method"} { + if _, ok := headers[header]; !ok { + return fmt.Errorf("missing required headers: %q", header) + } + } + + return nil +} + +// checkPseudoResponseHeaders verifies the required HTTP2 headers are present +// Some mandatory headers need to be set. It can happen when it wasn't a real HTTP2 request sent by Envoy, +func checkPseudoResponseHeaders(headers map[string]string) error { + if _, ok := headers[":status"]; !ok { + return fmt.Errorf("missing required ':status' headers") + } + + return nil +} + +func getRemoteAddr(md metadata.MD) string { + xfwd := md.Get("x-forwarded-for") + length := len(xfwd) + if length == 0 { + return "" + } + + // Get the first right value of x-forwarded-for headers + // The rightmost IP address is the one that will be used as the remote client IP + // https://datadoghq.atlassian.net/wiki/spaces/TS/pages/2766733526/Sensitive+IP+information#Where-does-the-value-of-the-http.client_ip-tag-come-from%3F + return xfwd[length-1] +} + +// splitPseudoHeaders splits normal headers of the initial request made by the client and the pseudo headers of HTTP/2 +// - Format the headers to be used by the tracer as a map[string][]string +// - Set headers keys to be canonical +func splitPseudoHeaders(receivedHeaders []*corev3.HeaderValue) (headers map[string][]string, pseudoHeaders map[string]string) { + headers = make(map[string][]string, len(receivedHeaders)-4) + pseudoHeaders = make(map[string]string, 4) + for _, v := range receivedHeaders { + key := v.GetKey() + if key == "" { + continue + } + if key[0] == ':' { + pseudoHeaders[key] = string(v.GetRawValue()) + continue + } + + headers[http.CanonicalHeaderKey(key)] = []string{string(v.GetRawValue())} + } + return headers, pseudoHeaders +} + +func createFakeResponseWriter(w http.ResponseWriter, res *extproc.ProcessingRequest_ResponseHeaders) error { + headers, pseudoHeaders := splitPseudoHeaders(res.ResponseHeaders.GetHeaders().GetHeaders()) + + if err := checkPseudoResponseHeaders(pseudoHeaders); err != nil { + return err + } + + status, err := strconv.Atoi(pseudoHeaders[":status"]) + if err != nil { + return fmt.Errorf("error parsing status code %q: %w", pseudoHeaders[":status"], err) + } + + for k, v := range headers { + w.Header().Set(k, strings.Join(v, ",")) + } + + w.WriteHeader(status) + return nil +} + +// newRequest creates a new http.Request from an ext_proc RequestHeaders message +func newRequest(ctx context.Context, req *extproc.ProcessingRequest_RequestHeaders) (*http.Request, error) { + headers, pseudoHeaders := splitPseudoHeaders(req.RequestHeaders.GetHeaders().GetHeaders()) + if err := checkPseudoRequestHeaders(pseudoHeaders); err != nil { + return nil, err + } + + parsedURL, err := url.Parse(fmt.Sprintf("%s://%s%s", pseudoHeaders[":scheme"], pseudoHeaders[":authority"], pseudoHeaders[":path"])) + if err != nil { + return nil, fmt.Errorf( + "error building envoy URI from scheme %q, from host %q and from path %q: %w", + pseudoHeaders[":scheme"], + pseudoHeaders[":host"], + pseudoHeaders[":path"], + err) + } + + var remoteAddr string + md, ok := metadata.FromIncomingContext(ctx) + if ok { + remoteAddr = getRemoteAddr(md) + } + + var tlsState *tls.ConnectionState + if pseudoHeaders[":scheme"] == "https" { + tlsState = &tls.ConnectionState{} + } + + headers["Host"] = append(headers["Host"], pseudoHeaders[":authority"]) + + return (&http.Request{ + Method: pseudoHeaders[":method"], + Host: pseudoHeaders[":authority"], + RequestURI: pseudoHeaders[":path"], + URL: parsedURL, + Header: headers, + RemoteAddr: remoteAddr, + TLS: tlsState, + }).WithContext(ctx), nil +} + +type fakeResponseWriter struct { + mu sync.Mutex + status int + body []byte + headers http.Header +} + +// Reset resets the fakeResponseWriter to its initial state +func (w *fakeResponseWriter) Reset() { + w.mu.Lock() + defer w.mu.Unlock() + w.status = 0 + w.body = nil + w.headers = make(http.Header) +} + +// Status is not in the [http.ResponseWriter] interface, but it is cast into it by the tracing code +func (w *fakeResponseWriter) Status() int { + w.mu.Lock() + defer w.mu.Unlock() + return w.status +} + +func (w *fakeResponseWriter) WriteHeader(status int) { + w.mu.Lock() + defer w.mu.Unlock() + w.status = status +} + +func (w *fakeResponseWriter) Header() http.Header { + w.mu.Lock() + defer w.mu.Unlock() + return w.headers +} + +func (w *fakeResponseWriter) Write(b []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + w.body = append(w.body, b...) + return len(b), nil +} + +var _ http.ResponseWriter = &fakeResponseWriter{} + +// newFakeResponseWriter creates a new fakeResponseWriter that can be used to store the response a [http.Handler] made +func newFakeResponseWriter() *fakeResponseWriter { + return &fakeResponseWriter{ + headers: make(http.Header), + } +} diff --git a/contrib/internal/httptrace/response_writer.go b/contrib/internal/httptrace/response_writer.go index 2bbc31bad7..f44fff762f 100644 --- a/contrib/internal/httptrace/response_writer.go +++ b/contrib/internal/httptrace/response_writer.go @@ -16,6 +16,13 @@ type responseWriter struct { status int } +// ResetStatusCode resets the status code of the response writer. +func ResetStatusCode(w http.ResponseWriter) { + if rw, ok := w.(*responseWriter); ok { + rw.status = 0 + } +} + func newResponseWriter(w http.ResponseWriter) *responseWriter { return &responseWriter{w, 0} } diff --git a/go.mod b/go.mod index f3a841b96d..62452c745f 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/elastic/go-elasticsearch/v8 v8.4.0 github.com/emicklei/go-restful v2.16.0+incompatible github.com/emicklei/go-restful/v3 v3.11.0 + github.com/envoyproxy/go-control-plane v0.12.0 github.com/garyburd/redigo v1.6.4 github.com/gin-gonic/gin v1.9.1 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 @@ -154,6 +155,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect + github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -162,6 +164,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/ebitengine/purego v0.6.0-alpha.5 // indirect github.com/elastic/elastic-transport-go/v8 v8.1.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect diff --git a/go.sum b/go.sum index 6471b96ff5..8388b569ef 100644 --- a/go.sum +++ b/go.sum @@ -890,6 +890,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc= +github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= @@ -1118,9 +1120,13 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJJM//w9BV6Fxbg2LuVd34= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= diff --git a/internal/appsec/testdata/user_rules.json b/internal/appsec/testdata/user_rules.json index 6acb14089e..a13a0d67ed 100644 --- a/internal/appsec/testdata/user_rules.json +++ b/internal/appsec/testdata/user_rules.json @@ -53,6 +53,33 @@ "block" ] }, + { + "id": "tst-037-008", + "name": "Test block on cookies", + "tags": { + "type": "lfi", + "crs_id": "000008", + "category": "attack_attempt" + }, + "conditions": [ + { + "parameters": { + "inputs": [ + { + "address": "server.request.cookies" + } + ], + "regex": "jdfoSDGFkivRG_234" + }, + "operator": "match_regex" + } + ], + "transformers": [], + "on_match": [ + "block" + ] + }, + { "id": "headers-003", "name": "query match", From 5a8a82ce66fd77fcefa4f983968ab8f0ea89556a Mon Sep 17 00:00:00 2001 From: Quinna Halim Date: Mon, 16 Dec 2024 10:35:33 -0500 Subject: [PATCH 08/11] add go mod to workflows/apps (#3036) --- .github/workflows/apps/go.mod | 16 ++++++++++++++++ .github/workflows/apps/go.sum | 16 ++++++++++++++++ .github/workflows/apps/latest_major_version.go | 2 +- 3 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/apps/go.mod create mode 100644 .github/workflows/apps/go.sum diff --git a/.github/workflows/apps/go.mod b/.github/workflows/apps/go.mod new file mode 100644 index 0000000000..19c2d9dfde --- /dev/null +++ b/.github/workflows/apps/go.mod @@ -0,0 +1,16 @@ +module github.com/DataDog/dd-trace-go/.github/workflows/apps + +go 1.23.3 + +require ( + github.com/Masterminds/semver/v3 v3.3.1 + github.com/stretchr/testify v1.10.0 + golang.org/x/mod v0.22.0 + gopkg.in/DataDog/dd-trace-go.v1 v1.70.1 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/.github/workflows/apps/go.sum b/.github/workflows/apps/go.sum new file mode 100644 index 0000000000..4958ac6c0a --- /dev/null +++ b/.github/workflows/apps/go.sum @@ -0,0 +1,16 @@ +github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= +github.com/Masterminds/semver/v3 v3.3.1/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= +golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +gopkg.in/DataDog/dd-trace-go.v1 v1.70.1 h1:ZIRxAKlr3xr6xbMUDs3IDa6xq+ISv9zxyjaDCfwDjMY= +gopkg.in/DataDog/dd-trace-go.v1 v1.70.1/go.mod h1:PMOSkeY4VfXiuPvGodeNLCZCFYU2VfOvjVI6cX5bGrc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/.github/workflows/apps/latest_major_version.go b/.github/workflows/apps/latest_major_version.go index d1a7a5ccba..f471afe580 100644 --- a/.github/workflows/apps/latest_major_version.go +++ b/.github/workflows/apps/latest_major_version.go @@ -1,7 +1,7 @@ // Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. +// Copyright 2024 Datadog, Inc. package main From cd21985fed1106b3039d65b9c3be3281a7d2ee79 Mon Sep 17 00:00:00 2001 From: Eliott Bouhana <47679741+eliottness@users.noreply.github.com> Date: Tue, 17 Dec 2024 14:10:02 +0100 Subject: [PATCH 09/11] go.mod: module go.opentelemetry.io/collector/pdata@latest found (v1.21.0), but does not contain package go.opentelemetry.io/collector/pdata/internal/data/protogen/profiles/v1experimental (#3042) Signed-off-by: Eliott Bouhana --- go.mod | 1 + go.sum | 2 ++ internal/agent.go | 8 ++++++++ internal/apps/go.mod | 1 + internal/apps/go.sum | 2 ++ internal/exectracetest/go.mod | 1 + internal/exectracetest/go.sum | 2 ++ 7 files changed, 17 insertions(+) diff --git a/go.mod b/go.mod index 62452c745f..e301a9da7a 100644 --- a/go.mod +++ b/go.mod @@ -94,6 +94,7 @@ require ( github.com/vektah/gqlparser/v2 v2.5.16 github.com/zenazn/goji v1.0.1 go.mongodb.org/mongo-driver v1.12.1 + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 diff --git a/go.sum b/go.sum index 8388b569ef..ba7cea7f7c 100644 --- a/go.sum +++ b/go.sum @@ -2213,6 +2213,8 @@ go.opentelemetry.io/collector/config/configtelemetry v0.104.0 h1:eHv98XIhapZA8Mg go.opentelemetry.io/collector/config/configtelemetry v0.104.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= go.opentelemetry.io/collector/pdata v1.11.0 h1:rzYyV1zfTQQz1DI9hCiaKyyaczqawN75XO9mdXmR/hE= go.opentelemetry.io/collector/pdata v1.11.0/go.mod h1:IHxHsp+Jq/xfjORQMDJjSH6jvedOSTOyu3nbxqhWSYE= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0 h1:MYOIHvPlKEJbWLiBKFQWGD0xd2u22xGVLt4jPbdxP4Y= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0/go.mod h1:7WpyHk2wJZRx70CGkBio8klrYTTXASbyIhf+rH4FKnA= go.opentelemetry.io/collector/semconv v0.104.0 h1:dUvajnh+AYJLEW/XOPk0T0BlwltSdi3vrjO7nSOos3k= go.opentelemetry.io/collector/semconv v0.104.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= diff --git a/internal/agent.go b/internal/agent.go index f4bcdce8b0..8b0023274a 100644 --- a/internal/agent.go +++ b/internal/agent.go @@ -11,6 +11,14 @@ import ( "os" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + + // OTel did a breaking change to the module go.opentelemetry.io/collector/pdata which is imported by the agent + // and go.opentelemetry.io/collector/pdata/pprofile depends on it and is breaking because of it + // For some reason the dependency closure won't let use upgrade this module past the point where it does not break anymore + // So we are forced to add a blank import of this module to give us back the control over its version + // + // TODO: remove this once github.com/datadog-agent/pkg/trace has upgraded both modules past the breaking change + _ "go.opentelemetry.io/collector/pdata/pprofile" ) const ( diff --git a/internal/apps/go.mod b/internal/apps/go.mod index d90e69002e..76197025d9 100644 --- a/internal/apps/go.mod +++ b/internal/apps/go.mod @@ -44,6 +44,7 @@ require ( go.opentelemetry.io/collector/component v0.104.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect go.opentelemetry.io/collector/pdata v1.11.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect go.opentelemetry.io/collector/semconv v0.104.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/internal/apps/go.sum b/internal/apps/go.sum index 1ca195d002..a0e2472c51 100644 --- a/internal/apps/go.sum +++ b/internal/apps/go.sum @@ -195,6 +195,8 @@ go.opentelemetry.io/collector/config/configtelemetry v0.104.0 h1:eHv98XIhapZA8Mg go.opentelemetry.io/collector/config/configtelemetry v0.104.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= go.opentelemetry.io/collector/pdata v1.11.0 h1:rzYyV1zfTQQz1DI9hCiaKyyaczqawN75XO9mdXmR/hE= go.opentelemetry.io/collector/pdata v1.11.0/go.mod h1:IHxHsp+Jq/xfjORQMDJjSH6jvedOSTOyu3nbxqhWSYE= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0 h1:MYOIHvPlKEJbWLiBKFQWGD0xd2u22xGVLt4jPbdxP4Y= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0/go.mod h1:7WpyHk2wJZRx70CGkBio8klrYTTXASbyIhf+rH4FKnA= go.opentelemetry.io/collector/semconv v0.104.0 h1:dUvajnh+AYJLEW/XOPk0T0BlwltSdi3vrjO7nSOos3k= go.opentelemetry.io/collector/semconv v0.104.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= diff --git a/internal/exectracetest/go.mod b/internal/exectracetest/go.mod index 33a74a94ba..9292ebda48 100644 --- a/internal/exectracetest/go.mod +++ b/internal/exectracetest/go.mod @@ -57,6 +57,7 @@ require ( go.opentelemetry.io/collector/component v0.104.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect go.opentelemetry.io/collector/pdata v1.11.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect go.opentelemetry.io/collector/semconv v0.104.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/internal/exectracetest/go.sum b/internal/exectracetest/go.sum index 7321d5519b..00a653dc7e 100644 --- a/internal/exectracetest/go.sum +++ b/internal/exectracetest/go.sum @@ -201,6 +201,8 @@ go.opentelemetry.io/collector/config/configtelemetry v0.104.0 h1:eHv98XIhapZA8Mg go.opentelemetry.io/collector/config/configtelemetry v0.104.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= go.opentelemetry.io/collector/pdata v1.11.0 h1:rzYyV1zfTQQz1DI9hCiaKyyaczqawN75XO9mdXmR/hE= go.opentelemetry.io/collector/pdata v1.11.0/go.mod h1:IHxHsp+Jq/xfjORQMDJjSH6jvedOSTOyu3nbxqhWSYE= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0 h1:MYOIHvPlKEJbWLiBKFQWGD0xd2u22xGVLt4jPbdxP4Y= +go.opentelemetry.io/collector/pdata/pprofile v0.104.0/go.mod h1:7WpyHk2wJZRx70CGkBio8klrYTTXASbyIhf+rH4FKnA= go.opentelemetry.io/collector/semconv v0.104.0 h1:dUvajnh+AYJLEW/XOPk0T0BlwltSdi3vrjO7nSOos3k= go.opentelemetry.io/collector/semconv v0.104.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= From e8b2b8c7316b415c2657c407baee163b835b60b5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:44:44 -0500 Subject: [PATCH 10/11] chore: update latest majors (#2993) --- latests.txt | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 latests.txt diff --git a/latests.txt b/latests.txt new file mode 100644 index 0000000000..49ed641861 --- /dev/null +++ b/latests.txt @@ -0,0 +1,12 @@ +Latest DD major version of confluentinc/confluent-kafka-go: 2 +Latest DD major version of dimfeld/httptreemux: 5 +Latest DD major version of elastic/go-elasticsearch: 8 +Latest DD major version of emicklei/go-restful: 3 +Latest DD major version of go-chi/chi: 5 +Latest DD major version of go-pg/pg: 10 +Latest DD major version of go-redis/redis: 9 +Latest DD major version of gofiber/fiber: 2 +Latest DD major version of jackc/pgx: 5 +Latest DD major version of labstack/echo: 4 +Latest DD major version of redis/go-redis: 9 +Latest DD major version of vektah/gqlparser: 2 From d15e61ad1b9c08739c94b87ea8ec4b957a64cad4 Mon Sep 17 00:00:00 2001 From: Eliott Bouhana <47679741+eliottness@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:42:18 +0100 Subject: [PATCH 11/11] appsec: stop storing span tags, directly call span.SetTag (#3044) Signed-off-by: Eliott Bouhana --- contrib/99designs/gqlgen/tracer.go | 4 +-- contrib/google.golang.org/grpc/appsec.go | 8 +++--- contrib/graph-gophers/graphql-go/graphql.go | 4 +-- contrib/graphql-go/graphql/graphql.go | 4 +-- internal/appsec/emitter/graphqlsec/request.go | 8 +++--- internal/appsec/emitter/grpcsec/grpc.go | 8 +++--- internal/appsec/emitter/httpsec/http.go | 13 +++++---- .../emitter/trace/service_entry_span.go | 28 +++++++++---------- internal/appsec/emitter/waf/context.go | 8 +++--- 9 files changed, 42 insertions(+), 43 deletions(-) diff --git a/contrib/99designs/gqlgen/tracer.go b/contrib/99designs/gqlgen/tracer.go index ffdcb0c550..d4e1e0f52a 100644 --- a/contrib/99designs/gqlgen/tracer.go +++ b/contrib/99designs/gqlgen/tracer.go @@ -103,7 +103,7 @@ func (t *gqlTracer) Validate(_ graphql.ExecutableSchema) error { func (t *gqlTracer) InterceptOperation(ctx context.Context, next graphql.OperationHandler) graphql.ResponseHandler { opCtx := graphql.GetOperationContext(ctx) span, ctx := t.createRootSpan(ctx, opCtx) - ctx, req := graphqlsec.StartRequestOperation(ctx, graphqlsec.RequestOperationArgs{ + ctx, req := graphqlsec.StartRequestOperation(ctx, span, graphqlsec.RequestOperationArgs{ RawQuery: opCtx.RawQuery, OperationName: opCtx.OperationName, Variables: opCtx.Variables, @@ -137,7 +137,7 @@ func (t *gqlTracer) InterceptOperation(ctx context.Context, next graphql.Operati } query.Finish(executionOperationRes) - req.Finish(span, requestOperationRes) + req.Finish(requestOperationRes) return response } } diff --git a/contrib/google.golang.org/grpc/appsec.go b/contrib/google.golang.org/grpc/appsec.go index f7b4aecf53..4fb2f47a28 100644 --- a/contrib/google.golang.org/grpc/appsec.go +++ b/contrib/google.golang.org/grpc/appsec.go @@ -44,7 +44,7 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc remoteAddr = p.Addr.String() } - ctx, op, blockAtomic := grpcsec.StartHandlerOperation(ctx, grpcsec.HandlerOperationArgs{ + ctx, op, blockAtomic := grpcsec.StartHandlerOperation(ctx, span, grpcsec.HandlerOperationArgs{ Method: method, Metadata: md, RemoteAddr: remoteAddr, @@ -55,7 +55,7 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc if statusErr, ok := rpcErr.(interface{ GRPCStatus() *status.Status }); ok && !applyAction(blockAtomic, &rpcErr) { statusCode = int(statusErr.GRPCStatus().Code()) } - op.Finish(span, grpcsec.HandlerOperationRes{StatusCode: statusCode}) + op.Finish(grpcsec.HandlerOperationRes{StatusCode: statusCode}) applyAction(blockAtomic, &rpcErr) }() @@ -90,7 +90,7 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp } // Create the handler operation and listen to blocking gRPC actions to detect a blocking condition - ctx, op, blockAtomic := grpcsec.StartHandlerOperation(ctx, grpcsec.HandlerOperationArgs{ + ctx, op, blockAtomic := grpcsec.StartHandlerOperation(ctx, span, grpcsec.HandlerOperationArgs{ Method: method, Metadata: md, RemoteAddr: remoteAddr, @@ -104,7 +104,7 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp statusCode = int(res.Status()) } - op.Finish(span, grpcsec.HandlerOperationRes{StatusCode: statusCode}) + op.Finish(grpcsec.HandlerOperationRes{StatusCode: statusCode}) applyAction(blockAtomic, &rpcErr) }() diff --git a/contrib/graph-gophers/graphql-go/graphql.go b/contrib/graph-gophers/graphql-go/graphql.go index 040e7dff04..40f026a842 100644 --- a/contrib/graph-gophers/graphql-go/graphql.go +++ b/contrib/graph-gophers/graphql-go/graphql.go @@ -70,7 +70,7 @@ func (t *Tracer) TraceQuery(ctx context.Context, queryString, operationName stri } span, ctx := ddtracer.StartSpanFromContext(ctx, t.cfg.querySpanName, opts...) - ctx, request := graphqlsec.StartRequestOperation(ctx, graphqlsec.RequestOperationArgs{ + ctx, request := graphqlsec.StartRequestOperation(ctx, span, graphqlsec.RequestOperationArgs{ RawQuery: queryString, OperationName: operationName, Variables: variables, @@ -92,7 +92,7 @@ func (t *Tracer) TraceQuery(ctx context.Context, queryString, operationName stri err = fmt.Errorf("%s (and %d more errors)", errs[0], n-1) } defer span.Finish(ddtracer.WithError(err)) - defer request.Finish(span, graphqlsec.RequestOperationRes{Error: err}) + defer request.Finish(graphqlsec.RequestOperationRes{Error: err}) query.Finish(graphqlsec.ExecutionOperationRes{Error: err}) } } diff --git a/contrib/graphql-go/graphql/graphql.go b/contrib/graphql-go/graphql/graphql.go index 70c131d314..5d646a7f73 100644 --- a/contrib/graphql-go/graphql/graphql.go +++ b/contrib/graphql-go/graphql/graphql.go @@ -72,7 +72,7 @@ type contextData struct { // finish closes the top-level request operation, as well as the server span. func (c *contextData) finish(data any, err error) { defer c.serverSpan.Finish(tracer.WithError(err)) - c.requestOp.Finish(c.serverSpan, graphqlsec.RequestOperationRes{Data: data, Error: err}) + c.requestOp.Finish(graphqlsec.RequestOperationRes{Data: data, Error: err}) } var extensionName = reflect.TypeOf((*datadogExtension)(nil)).Elem().Name() @@ -97,7 +97,7 @@ func (i datadogExtension) Init(ctx context.Context, params *graphql.Params) cont tracer.Tag(ext.Component, componentName), tracer.Measured(), ) - ctx, request := graphqlsec.StartRequestOperation(ctx, graphqlsec.RequestOperationArgs{ + ctx, request := graphqlsec.StartRequestOperation(ctx, span, graphqlsec.RequestOperationArgs{ RawQuery: params.RequestString, Variables: params.VariableValues, OperationName: params.OperationName, diff --git a/internal/appsec/emitter/graphqlsec/request.go b/internal/appsec/emitter/graphqlsec/request.go index 20ae575660..6153c8b4c6 100644 --- a/internal/appsec/emitter/graphqlsec/request.go +++ b/internal/appsec/emitter/graphqlsec/request.go @@ -44,10 +44,10 @@ type ( // Finish the GraphQL query operation, along with the given results, and emit a finish event up in // the operation stack. -func (op *RequestOperation) Finish(span trace.TagSetter, res RequestOperationRes) { +func (op *RequestOperation) Finish(res RequestOperationRes) { dyngo.FinishOperation(op, res) if op.wafContextOwner { - op.ContextOperation.Finish(span) + op.ContextOperation.Finish() } } @@ -58,10 +58,10 @@ func (RequestOperationRes) IsResultOf(*RequestOperation) {} // emits a start event up in the operation stack. The operation is usually linked to tge global root // operation. The operation is tracked on the returned context, and can be extracted later on using // FromContext. -func StartRequestOperation(ctx context.Context, args RequestOperationArgs) (context.Context, *RequestOperation) { +func StartRequestOperation(ctx context.Context, span trace.TagSetter, args RequestOperationArgs) (context.Context, *RequestOperation) { wafOp, found := dyngo.FindOperation[waf.ContextOperation](ctx) if !found { // Usually we can find the HTTP Handler Operation as the parent, but it's technically optional - wafOp, ctx = waf.StartContextOperation(ctx) + wafOp, ctx = waf.StartContextOperation(ctx, span) } op := &RequestOperation{ diff --git a/internal/appsec/emitter/grpcsec/grpc.go b/internal/appsec/emitter/grpcsec/grpc.go index 2495e8bbd5..1d0e305bd9 100644 --- a/internal/appsec/emitter/grpcsec/grpc.go +++ b/internal/appsec/emitter/grpcsec/grpc.go @@ -77,10 +77,10 @@ func (HandlerOperationRes) IsResultOf(*HandlerOperation) {} // given arguments and parent operation, and emits a start event up in the // operation stack. When parent is nil, the operation is linked to the global // root operation. -func StartHandlerOperation(ctx context.Context, args HandlerOperationArgs) (context.Context, *HandlerOperation, *atomic.Pointer[actions.BlockGRPC]) { +func StartHandlerOperation(ctx context.Context, span trace.TagSetter, args HandlerOperationArgs) (context.Context, *HandlerOperation, *atomic.Pointer[actions.BlockGRPC]) { wafOp, found := dyngo.FindOperation[waf.ContextOperation](ctx) if !found { - wafOp, ctx = waf.StartContextOperation(ctx) + wafOp, ctx = waf.StartContextOperation(ctx, span) } op := &HandlerOperation{ Operation: dyngo.NewOperation(wafOp), @@ -117,9 +117,9 @@ func MonitorResponseMessage(ctx context.Context, msg any) error { // Finish the gRPC handler operation, along with the given results, and emit a // finish event up in the operation stack. -func (op *HandlerOperation) Finish(span trace.TagSetter, res HandlerOperationRes) { +func (op *HandlerOperation) Finish(res HandlerOperationRes) { dyngo.FinishOperation(op, res) if op.wafContextOwner { - op.ContextOperation.Finish(span) + op.ContextOperation.Finish() } } diff --git a/internal/appsec/emitter/httpsec/http.go b/internal/appsec/emitter/httpsec/http.go index 41ebfa7e23..16cfa2ba3e 100644 --- a/internal/appsec/emitter/httpsec/http.go +++ b/internal/appsec/emitter/httpsec/http.go @@ -19,6 +19,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/trace" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/actions" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/addresses" @@ -57,10 +58,10 @@ type ( func (HandlerOperationArgs) IsArgOf(*HandlerOperation) {} func (HandlerOperationRes) IsResultOf(*HandlerOperation) {} -func StartOperation(ctx context.Context, args HandlerOperationArgs) (*HandlerOperation, *atomic.Pointer[actions.BlockHTTP], context.Context) { +func StartOperation(ctx context.Context, args HandlerOperationArgs, span trace.TagSetter) (*HandlerOperation, *atomic.Pointer[actions.BlockHTTP], context.Context) { wafOp, found := dyngo.FindOperation[waf.ContextOperation](ctx) if !found { - wafOp, ctx = waf.StartContextOperation(ctx) + wafOp, ctx = waf.StartContextOperation(ctx, span) } op := &HandlerOperation{ @@ -79,10 +80,10 @@ func StartOperation(ctx context.Context, args HandlerOperationArgs) (*HandlerOpe } // Finish the HTTP handler operation and its children operations and write everything to the service entry span. -func (op *HandlerOperation) Finish(res HandlerOperationRes, span ddtrace.Span) { +func (op *HandlerOperation) Finish(res HandlerOperationRes) { dyngo.FinishOperation(op, res) if op.wafContextOwner { - op.ContextOperation.Finish(span) + op.ContextOperation.Finish() } } @@ -142,7 +143,7 @@ func BeforeHandle( Cookies: makeCookies(r.Cookies()), QueryParams: r.URL.Query(), PathParams: pathParams, - }) + }, span) tr := r.WithContext(ctx) var blocked atomic.Bool @@ -154,7 +155,7 @@ func BeforeHandle( op.Finish(HandlerOperationRes{ Headers: opts.ResponseHeaderCopier(w), StatusCode: statusCode, - }, span) + }) if blockPtr := blockAtomic.Swap(nil); blockPtr != nil { blockPtr.Handler.ServeHTTP(w, tr) diff --git a/internal/appsec/emitter/trace/service_entry_span.go b/internal/appsec/emitter/trace/service_entry_span.go index 98e14b092f..802a82e7e8 100644 --- a/internal/appsec/emitter/trace/service_entry_span.go +++ b/internal/appsec/emitter/trace/service_entry_span.go @@ -18,9 +18,9 @@ type ( // ServiceEntrySpanOperation is a dyngo.Operation that holds a the first span of a service. Usually a http or grpc span. ServiceEntrySpanOperation struct { dyngo.Operation - tags map[string]any - jsonTags map[string]any - mu sync.Mutex + jsonTags map[string]any + tagSetter TagSetter + mu sync.Mutex } // ServiceEntrySpanArgs is the arguments for a ServiceEntrySpanOperation @@ -52,7 +52,7 @@ func (ServiceEntrySpanArgs) IsArgOf(*ServiceEntrySpanOperation) {} func (op *ServiceEntrySpanOperation) SetTag(key string, value any) { op.mu.Lock() defer op.mu.Unlock() - op.tags[key] = value + op.tagSetter.SetTag(key, value) } // SetSerializableTag adds the key/value pair to the tags to add to the service entry span. @@ -76,7 +76,7 @@ func (op *ServiceEntrySpanOperation) SetSerializableTags(tags map[string]any) { func (op *ServiceEntrySpanOperation) setSerializableTag(key string, value any) { switch value.(type) { case string, int8, int16, int32, int64, uint8, uint16, uint32, uint64, float32, float64, bool: - op.tags[key] = value + op.tagSetter.SetTag(key, value) default: op.jsonTags[key] = value } @@ -87,7 +87,7 @@ func (op *ServiceEntrySpanOperation) SetTags(tags map[string]any) { op.mu.Lock() defer op.mu.Unlock() for k, v := range tags { - op.tags[k] = v + op.tagSetter.SetTag(k, v) } } @@ -96,7 +96,7 @@ func (op *ServiceEntrySpanOperation) SetStringTags(tags map[string]string) { op.mu.Lock() defer op.mu.Unlock() for k, v := range tags { - op.tags[k] = v + op.tagSetter.SetTag(k, v) } } @@ -126,17 +126,18 @@ func (op *ServiceEntrySpanOperation) OnSpanTagEvent(tag SpanTag) { op.SetTag(tag.Key, tag.Value) } -func StartServiceEntrySpanOperation(ctx context.Context) (*ServiceEntrySpanOperation, context.Context) { +func StartServiceEntrySpanOperation(ctx context.Context, span TagSetter) (*ServiceEntrySpanOperation, context.Context) { parent, _ := dyngo.FromContext(ctx) op := &ServiceEntrySpanOperation{ Operation: dyngo.NewOperation(parent), - tags: make(map[string]any), - jsonTags: make(map[string]any), + jsonTags: make(map[string]any, 2), + tagSetter: span, } return op, dyngo.StartAndRegisterOperation(ctx, op, ServiceEntrySpanArgs{}) } -func (op *ServiceEntrySpanOperation) Finish(span TagSetter) { +func (op *ServiceEntrySpanOperation) Finish() { + span := op.tagSetter if _, ok := span.(*NoopTagSetter); ok { // If the span is a NoopTagSetter or is nil, we don't need to set any tags return } @@ -144,14 +145,11 @@ func (op *ServiceEntrySpanOperation) Finish(span TagSetter) { op.mu.Lock() defer op.mu.Unlock() - for k, v := range op.tags { - span.SetTag(k, v) - } - for k, v := range op.jsonTags { strValue, err := json.Marshal(v) if err != nil { log.Debug("appsec: failed to marshal tag %s: %v", k, err) + continue } span.SetTag(k, string(strValue)) } diff --git a/internal/appsec/emitter/waf/context.go b/internal/appsec/emitter/waf/context.go index 698e721880..e88e03b6b8 100644 --- a/internal/appsec/emitter/waf/context.go +++ b/internal/appsec/emitter/waf/context.go @@ -61,8 +61,8 @@ type ( func (ContextArgs) IsArgOf(*ContextOperation) {} func (ContextRes) IsResultOf(*ContextOperation) {} -func StartContextOperation(ctx context.Context) (*ContextOperation, context.Context) { - entrySpanOp, ctx := trace.StartServiceEntrySpanOperation(ctx) +func StartContextOperation(ctx context.Context, span trace.TagSetter) (*ContextOperation, context.Context) { + entrySpanOp, ctx := trace.StartServiceEntrySpanOperation(ctx, span) op := &ContextOperation{ Operation: dyngo.NewOperation(entrySpanOp), ServiceEntrySpanOperation: entrySpanOp, @@ -70,9 +70,9 @@ func StartContextOperation(ctx context.Context) (*ContextOperation, context.Cont return op, dyngo.StartAndRegisterOperation(ctx, op, ContextArgs{}) } -func (op *ContextOperation) Finish(span trace.TagSetter) { +func (op *ContextOperation) Finish() { dyngo.FinishOperation(op, ContextRes{}) - op.ServiceEntrySpanOperation.Finish(span) + op.ServiceEntrySpanOperation.Finish() } func (op *ContextOperation) SwapContext(ctx *waf.Context) *waf.Context {