From 2948aa9015c2098bf1d0a972b9d729c0d9da2f86 Mon Sep 17 00:00:00 2001 From: Stanley Liu Date: Wed, 29 May 2024 17:03:31 -0400 Subject: [PATCH] Implement metrics for infraattributesprocessor (#25711) * Initial version * Rename tagenrichmentprocessor to infraattributesprocessor * Fix merge * Rename config_logs_strict * Add tagger component * Fix tests and lint * tidy * Refactor to tag multiple entities * Rename * Temp update mapping go attributes * tidy * Update attributes mapping * Fix resource metrics loop, add more tests * Update attributes to v0.16.1 * Update attributes to v0.16.1 * Add container_image_metadata * generate licenses * Rename * Fix merge conflicts * generate deps --- LICENSE-3rdparty.csv | 1 + cmd/otel-agent/subcommands/run/command.go | 4 + cmd/serverless/dependencies_linux_amd64.txt | 1 + cmd/serverless/dependencies_linux_arm64.txt | 1 + .../collector/impl-pipeline/pipeline.go | 7 +- comp/otelcol/collector/impl/collector.go | 4 +- comp/otelcol/otlp/collector.go | 19 +- comp/otelcol/otlp/collector_test.go | 13 +- .../exporter/datadogexporter/go.mod | 2 +- .../exporter/datadogexporter/go.sum | 4 +- .../exporter/logsagentexporter/go.mod | 2 +- .../exporter/logsagentexporter/go.sum | 4 +- .../exporter/serializerexporter/go.mod | 2 +- .../exporter/serializerexporter/go.sum | 4 +- .../otlp/components/pipeline/provider/go.mod | 2 +- .../infraattributesprocessor/config.go | 4 + .../infraattributesprocessor/config_test.go | 8 +- .../infraattributesprocessor/factory.go | 45 ++-- .../infraattributesprocessor/factory_test.go | 14 +- .../infraattributesprocessor/logs.go | 2 +- .../infraattributesprocessor/logs_test.go | 6 +- .../infraattributesprocessor/metadata.go | 6 +- .../infraattributesprocessor/metrics.go | 110 ++++++++- .../infraattributesprocessor/metrics_test.go | 226 ++++++++++++++++-- .../infraattributesprocessor/package_test.go | 16 -- .../infraattributesprocessor/traces.go | 2 +- .../infraattributesprocessor/traces_test.go | 6 +- .../otlp/map_provider_not_serverless_test.go | 8 +- go.mod | 5 +- go.sum | 4 +- pkg/serverless/otlp/otlp.go | 2 +- 31 files changed, 432 insertions(+), 102 deletions(-) delete mode 100644 comp/otelcol/otlp/components/processor/infraattributesprocessor/package_test.go diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 4bc52d22e032a..cabb2fc4688ec 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -3085,6 +3085,7 @@ core,go.opentelemetry.io/collector/semconv/v1.13.0,Apache-2.0,Copyright The Open core,go.opentelemetry.io/collector/semconv/v1.16.0,Apache-2.0,Copyright The OpenTelemetry Authors core,go.opentelemetry.io/collector/semconv/v1.17.0,Apache-2.0,Copyright The OpenTelemetry Authors core,go.opentelemetry.io/collector/semconv/v1.18.0,Apache-2.0,Copyright The OpenTelemetry Authors +core,go.opentelemetry.io/collector/semconv/v1.21.0,Apache-2.0,Copyright The OpenTelemetry Authors core,go.opentelemetry.io/collector/semconv/v1.22.0,Apache-2.0,Copyright The OpenTelemetry Authors core,go.opentelemetry.io/collector/semconv/v1.6.1,Apache-2.0,Copyright The OpenTelemetry Authors core,go.opentelemetry.io/collector/semconv/v1.8.0,Apache-2.0,Copyright The OpenTelemetry Authors diff --git a/cmd/otel-agent/subcommands/run/command.go b/cmd/otel-agent/subcommands/run/command.go index 7b48903eb4a54..41c61dac6ff4d 100644 --- a/cmd/otel-agent/subcommands/run/command.go +++ b/cmd/otel-agent/subcommands/run/command.go @@ -20,6 +20,8 @@ import ( corelogimpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/core/secrets" "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig" + "github.com/DataDog/datadog-agent/comp/core/tagger" + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" @@ -81,6 +83,8 @@ func runOTelAgentCommand(_ context.Context, params *subcommands.GlobalParams, op inventoryagentimpl.Module(), workloadmeta.Module(), hostnameimpl.Module(), + fx.Provide(tagger.NewTaggerParams), + taggerimpl.Module(), sysprobeconfig.NoneModule(), fetchonlyimpl.Module(), collectorfx.Module(), diff --git a/cmd/serverless/dependencies_linux_amd64.txt b/cmd/serverless/dependencies_linux_amd64.txt index 2c6e9689dc098..30a8125e3371c 100644 --- a/cmd/serverless/dependencies_linux_amd64.txt +++ b/cmd/serverless/dependencies_linux_amd64.txt @@ -611,6 +611,7 @@ go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace go.opentelemetry.io/collector/receiver/receiverhelper go.opentelemetry.io/collector/semconv/v1.17.0 go.opentelemetry.io/collector/semconv/v1.18.0 +go.opentelemetry.io/collector/semconv/v1.21.0 go.opentelemetry.io/collector/semconv/v1.6.1 go.opentelemetry.io/collector/service go.opentelemetry.io/collector/service/extensions diff --git a/cmd/serverless/dependencies_linux_arm64.txt b/cmd/serverless/dependencies_linux_arm64.txt index 49379cf102325..65442897051cb 100644 --- a/cmd/serverless/dependencies_linux_arm64.txt +++ b/cmd/serverless/dependencies_linux_arm64.txt @@ -610,6 +610,7 @@ go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace go.opentelemetry.io/collector/receiver/receiverhelper go.opentelemetry.io/collector/semconv/v1.17.0 go.opentelemetry.io/collector/semconv/v1.18.0 +go.opentelemetry.io/collector/semconv/v1.21.0 go.opentelemetry.io/collector/semconv/v1.6.1 go.opentelemetry.io/collector/service go.opentelemetry.io/collector/service/extensions diff --git a/comp/otelcol/collector/impl-pipeline/pipeline.go b/comp/otelcol/collector/impl-pipeline/pipeline.go index 79dcb1cf875ec..8420a24b02a38 100644 --- a/comp/otelcol/collector/impl-pipeline/pipeline.go +++ b/comp/otelcol/collector/impl-pipeline/pipeline.go @@ -14,6 +14,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" corelog "github.com/DataDog/datadog-agent/comp/core/log" "github.com/DataDog/datadog-agent/comp/core/status" + "github.com/DataDog/datadog-agent/comp/core/tagger" compdef "github.com/DataDog/datadog-agent/comp/def" "github.com/DataDog/datadog-agent/comp/metadata/inventoryagent" collector "github.com/DataDog/datadog-agent/comp/otelcol/collector/def" @@ -51,6 +52,8 @@ type Requires struct { // InventoryAgent require the inventory metadata payload, allowing otelcol to add data to it. InventoryAgent inventoryagent.Component + + Tagger tagger.Component } // Provides specifics the types returned by the constructor @@ -68,6 +71,7 @@ type collectorImpl struct { serializer serializer.MetricSerializer logsAgent optional.Option[logsagentpipeline.Component] inventoryAgent inventoryagent.Component + tagger tagger.Component } func (c *collectorImpl) start(context.Context) error { @@ -83,7 +87,7 @@ func (c *collectorImpl) start(context.Context) error { } } var err error - col, err := otlp.NewPipelineFromAgentConfig(c.config, c.serializer, logch) + col, err := otlp.NewPipelineFromAgentConfig(c.config, c.serializer, logch, c.tagger) if err != nil { // failure to start the OTLP component shouldn't fail startup c.log.Errorf("Error creating the OTLP ingest pipeline: %v", err) @@ -121,6 +125,7 @@ func NewComponent(reqs Requires) (Provides, error) { serializer: reqs.Serializer, logsAgent: reqs.LogsAgent, inventoryAgent: reqs.InventoryAgent, + tagger: reqs.Tagger, } reqs.Lc.Append(compdef.Hook{ diff --git a/comp/otelcol/collector/impl/collector.go b/comp/otelcol/collector/impl/collector.go index 5cf318eeaf981..d9cf67eb04861 100644 --- a/comp/otelcol/collector/impl/collector.go +++ b/comp/otelcol/collector/impl/collector.go @@ -17,6 +17,7 @@ import ( flarebuilder "github.com/DataDog/datadog-agent/comp/core/flare/builder" "github.com/DataDog/datadog-agent/comp/core/hostname" corelog "github.com/DataDog/datadog-agent/comp/core/log" + "github.com/DataDog/datadog-agent/comp/core/tagger" compdef "github.com/DataDog/datadog-agent/comp/def" collectorcontrib "github.com/DataDog/datadog-agent/comp/otelcol/collector-contrib/def" collector "github.com/DataDog/datadog-agent/comp/otelcol/collector/def" @@ -48,6 +49,7 @@ type Requires struct { Serializer serializer.MetricSerializer LogsAgent optional.Option[logsagentpipeline.Component] HostName hostname.Component + Tagger tagger.Component } // Provides declares the output types from the constructor @@ -76,7 +78,7 @@ func NewComponent(reqs Requires) (Provides, error) { } else { factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.Serializer, nil, reqs.HostName) } - factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory() + factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory(reqs.Tagger) return factories, nil }, ConfigProvider: reqs.Provider, diff --git a/comp/otelcol/otlp/collector.go b/comp/otelcol/otlp/collector.go index 4af8f540d7c4e..e3896be2933e2 100644 --- a/comp/otelcol/otlp/collector.go +++ b/comp/otelcol/otlp/collector.go @@ -82,7 +82,7 @@ func (t *tagEnricher) Enrich(_ context.Context, extraTags []string, dimensions * return enrichedTags } -func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message.Message) ( +func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message.Message, tagger tagger.Component) ( otelcol.Factories, error, ) { @@ -115,10 +115,11 @@ func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message errs = append(errs, err) } - processors, err := processor.MakeFactoryMap( - batchprocessor.NewFactory(), - infraattributesprocessor.NewFactory(), - ) + processorFactories := []processor.Factory{batchprocessor.NewFactory()} + if tagger != nil { + processorFactories = append(processorFactories, infraattributesprocessor.NewFactory(tagger)) + } + processors, err := processor.MakeFactoryMap(processorFactories...) if err != nil { errs = append(errs, err) } @@ -194,7 +195,7 @@ type Pipeline struct { type CollectorStatus = datatype.CollectorStatus // NewPipeline defines a new OTLP pipeline. -func NewPipeline(cfg PipelineConfig, s serializer.MetricSerializer, logsAgentChannel chan *message.Message) (*Pipeline, error) { +func NewPipeline(cfg PipelineConfig, s serializer.MetricSerializer, logsAgentChannel chan *message.Message, tagger tagger.Component) (*Pipeline, error) { buildInfo, err := getBuildInfo() if err != nil { return nil, fmt.Errorf("failed to get build info: %w", err) @@ -214,7 +215,7 @@ func NewPipeline(cfg PipelineConfig, s serializer.MetricSerializer, logsAgentCha col, err := otelcol.NewCollector(otelcol.CollectorSettings{ Factories: func() (otelcol.Factories, error) { - return getComponents(s, logsAgentChannel) + return getComponents(s, logsAgentChannel, tagger) }, BuildInfo: buildInfo, DisableGracefulShutdown: true, @@ -251,7 +252,7 @@ func (p *Pipeline) Stop() { // NewPipelineFromAgentConfig creates a new pipeline from the given agent configuration, metric serializer and logs channel. It returns // any potential failure. -func NewPipelineFromAgentConfig(cfg config.Component, s serializer.MetricSerializer, logsAgentChannel chan *message.Message) (*Pipeline, error) { +func NewPipelineFromAgentConfig(cfg config.Component, s serializer.MetricSerializer, logsAgentChannel chan *message.Message, tagger tagger.Component) (*Pipeline, error) { pcfg, err := FromAgentConfig(cfg) if err != nil { pipelineError.Store(fmt.Errorf("config error: %w", err)) @@ -260,7 +261,7 @@ func NewPipelineFromAgentConfig(cfg config.Component, s serializer.MetricSeriali if err := checkAndUpdateCfg(cfg, pcfg, logsAgentChannel); err != nil { return nil, err } - p, err := NewPipeline(pcfg, s, logsAgentChannel) + p, err := NewPipeline(pcfg, s, logsAgentChannel, tagger) if err != nil { pipelineError.Store(fmt.Errorf("failed to build pipeline: %w", err)) return nil, pipelineError.Load() diff --git a/comp/otelcol/otlp/collector_test.go b/comp/otelcol/otlp/collector_test.go index 3c5c88d872de0..e9e746b664c34 100644 --- a/comp/otelcol/otlp/collector_test.go +++ b/comp/otelcol/otlp/collector_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" "github.com/DataDog/datadog-agent/comp/otelcol/otlp/testutil" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/logs/message" @@ -23,13 +24,17 @@ import ( ) func TestGetComponents(t *testing.T) { - _, err := getComponents(&serializer.MockSerializer{}, make(chan *message.Message)) + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + _, err := getComponents(&serializer.MockSerializer{}, make(chan *message.Message), fakeTagger) // No duplicate component require.NoError(t, err) } func AssertSucessfulRun(t *testing.T, pcfg PipelineConfig) { - p, err := NewPipeline(pcfg, &serializer.MockSerializer{}, make(chan *message.Message)) + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + p, err := NewPipeline(pcfg, &serializer.MockSerializer{}, make(chan *message.Message), fakeTagger) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -54,7 +59,9 @@ func AssertSucessfulRun(t *testing.T, pcfg PipelineConfig) { } func AssertFailedRun(t *testing.T, pcfg PipelineConfig, expected string) { - p, err := NewPipeline(pcfg, &serializer.MockSerializer{}, make(chan *message.Message)) + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + p, err := NewPipeline(pcfg, &serializer.MockSerializer{}, make(chan *message.Message), fakeTagger) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod b/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod index dc68cc4f9e5e1..0619a0aba20ab 100644 --- a/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod +++ b/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod @@ -85,7 +85,7 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/message v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/serializer v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/util/hostname/validate v0.54.0-rc.2 - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0 + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.16.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.0 diff --git a/comp/otelcol/otlp/components/exporter/datadogexporter/go.sum b/comp/otelcol/otlp/components/exporter/datadogexporter/go.sum index 865ca96d98bd5..31449ad831d59 100644 --- a/comp/otelcol/otlp/components/exporter/datadogexporter/go.sum +++ b/comp/otelcol/otlp/components/exporter/datadogexporter/go.sum @@ -10,8 +10,8 @@ github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49 h1:EbzDX8HPk5uE2FsJYx github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49/go.mod h1:SvsjzyJlSg0rKsqYgdcFxeEVflx3ZNAyFfkUHP0TxXg= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.16.0 h1:VJT1Jjlz/ca999FEqaAS+He7S4eB14a+PJjczgRdgAY= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.16.0/go.mod h1:66XlN7QpQKqIvw8e2UbCXV5X8wGnEw851nT9BjJ75dY= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0 h1:g/ztrLYZNfkpW6Bt8kMnLed5DaKRHEtiKE0opHXLHJk= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 h1:ZI8u3CgdMXpDplrf9/gIr13+/g/tUzUcBMk2ZhXgzLE= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.16.0 h1:NbKlfbjR2joF52jEBLs3MEnT6l5zM3MCyhUFkqARZpk= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.16.0/go.mod h1:+LijQ2LdlocAQ4WB+7KsoIGe90bfogkRslubd9swVow= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.16.0 h1:H5DzD3rwgQCX0VI3A16KgsdmC5grUCyDFflaZDpfgMc= diff --git a/comp/otelcol/otlp/components/exporter/logsagentexporter/go.mod b/comp/otelcol/otlp/components/exporter/logsagentexporter/go.mod index 6738710153680..119f1230fb91f 100644 --- a/comp/otelcol/otlp/components/exporter/logsagentexporter/go.mod +++ b/comp/otelcol/otlp/components/exporter/logsagentexporter/go.mod @@ -42,7 +42,7 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/message v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/logs/sources v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/util/scrubber v0.54.0-rc.2 - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0 + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.14.0 github.com/stormcat24/protodep v0.1.8 github.com/stretchr/testify v1.9.0 diff --git a/comp/otelcol/otlp/components/exporter/logsagentexporter/go.sum b/comp/otelcol/otlp/components/exporter/logsagentexporter/go.sum index fdb527179c99c..e02b4cea1d411 100644 --- a/comp/otelcol/otlp/components/exporter/logsagentexporter/go.sum +++ b/comp/otelcol/otlp/components/exporter/logsagentexporter/go.sum @@ -4,8 +4,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8 github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/DataDog/datadog-api-client-go/v2 v2.13.0 h1:2c1dXSyUfum2YIVoYlqnBhV5JOG1cLSW+4jB3RrKjLc= github.com/DataDog/datadog-api-client-go/v2 v2.13.0/go.mod h1:kntOqXEh1SmjwSDzW/eJkr9kS7EqttvEkelglWtJRbg= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0 h1:10TPqpTlIkmDPFWVIEZ4ZX3rWrCrx3rEoeoAooZr6LM= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 h1:ZI8u3CgdMXpDplrf9/gIr13+/g/tUzUcBMk2ZhXgzLE= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.14.0 h1:nma5ZICTbHZ0YoMu18ziWGSLK1ICzMm6rJTv+IatJ0U= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.14.0/go.mod h1:xUiGj13q5uHPboc0xZ754fyusiF5C2RxNzOFdTbdZFA= github.com/DataDog/viper v1.13.4 h1:0SrZc3zvMAGgVKg96uP4DEJb13lK2Is9a4go7IIcFSE= diff --git a/comp/otelcol/otlp/components/exporter/serializerexporter/go.mod b/comp/otelcol/otlp/components/exporter/serializerexporter/go.mod index 5a6904988d052..455a9545dcfe2 100644 --- a/comp/otelcol/otlp/components/exporter/serializerexporter/go.mod +++ b/comp/otelcol/otlp/components/exporter/serializerexporter/go.mod @@ -62,7 +62,7 @@ require ( github.com/DataDog/datadog-agent/pkg/serializer v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/tagset v0.54.0-rc.2 github.com/DataDog/datadog-agent/pkg/util/log v0.54.0-rc.2 - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0 + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.14.0 github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.14.0 github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect diff --git a/comp/otelcol/otlp/components/exporter/serializerexporter/go.sum b/comp/otelcol/otlp/components/exporter/serializerexporter/go.sum index f61fcaa4f266e..2c5a7da1b8a34 100644 --- a/comp/otelcol/otlp/components/exporter/serializerexporter/go.sum +++ b/comp/otelcol/otlp/components/exporter/serializerexporter/go.sum @@ -6,8 +6,8 @@ github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49 h1:EbzDX8HPk5uE2FsJYx github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49/go.mod h1:SvsjzyJlSg0rKsqYgdcFxeEVflx3ZNAyFfkUHP0TxXg= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.14.0 h1:J0IEqkrB8BjtuDHofR8Q3J+Z8829Ja1Mlix9cyG8wJI= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.14.0/go.mod h1:66XlN7QpQKqIvw8e2UbCXV5X8wGnEw851nT9BjJ75dY= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0 h1:10TPqpTlIkmDPFWVIEZ4ZX3rWrCrx3rEoeoAooZr6LM= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 h1:ZI8u3CgdMXpDplrf9/gIr13+/g/tUzUcBMk2ZhXgzLE= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.14.0 h1:2Ou9n/048KM9E70G+LpJ4svBH2kOb8NkxwEqEGAamHo= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.14.0/go.mod h1:CAj5xIuF45abugAck7OzjMLH5y20ylaEj1qbQEjU1gk= github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.14.0 h1:QHx6B/VUx3rZQqrQNZI5BfypbhhGSRzCz05viyJEQmM= diff --git a/comp/otelcol/otlp/components/pipeline/provider/go.mod b/comp/otelcol/otlp/components/pipeline/provider/go.mod index b8e737ffcc0ff..dd36344c5d0e2 100644 --- a/comp/otelcol/otlp/components/pipeline/provider/go.mod +++ b/comp/otelcol/otlp/components/pipeline/provider/go.mod @@ -82,7 +82,7 @@ require ( github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect github.com/DataDog/gohai v0.0.0-20230524154621-4316413895ee // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata v0.16.0 // indirect - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0 // indirect + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.16.0 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.16.0 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.16.0 // indirect diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/config.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/config.go index a24a25b8db6fa..8b1c438bdcfc2 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/config.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/config.go @@ -7,6 +7,8 @@ package infraattributesprocessor import ( "go.opentelemetry.io/collector/component" + + "github.com/DataDog/datadog-agent/comp/core/tagger/types" ) // Config defines configuration for processor. @@ -14,6 +16,8 @@ type Config struct { Metrics MetricInfraAttributes `mapstructure:"metrics"` Logs LogInfraAttributes `mapstructure:"logs"` Traces TraceInfraAttributes `mapstructure:"traces"` + + Cardinality types.TagCardinality `mapstructure:"cardinality"` } // MetricInfraAttributes - configuration for metrics. diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/config_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/config_test.go index c8bf6903ec9dc..0011a3b0a8d14 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/config_test.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/config_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" ) // TestLoadingConfigStrictLogs tests loading testdata/logs_strict.yaml @@ -34,8 +36,10 @@ func TestLoadingConfigStrictLogs(t *testing.T) { for _, tt := range tests { t.Run(tt.id.String(), func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + f := NewFactory(fakeTagger) + cfg := f.CreateDefaultConfig() sub, err := cm.Sub(tt.id.String()) require.NoError(t, err) diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory.go index b71129f5bfc16..00046a13a8296 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory.go @@ -8,6 +8,9 @@ package infraattributesprocessor import ( "context" + "github.com/DataDog/datadog-agent/comp/core/tagger" + "github.com/DataDog/datadog-agent/comp/core/tagger/types" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" @@ -16,28 +19,38 @@ import ( var processorCapabilities = consumer.Capabilities{MutatesData: true} +type factory struct { + tagger tagger.Component +} + // NewFactory returns a new factory for the InfraAttributes processor. -func NewFactory() processor.Factory { +func NewFactory(tagger tagger.Component) processor.Factory { + f := &factory{ + tagger: tagger, + } + return processor.NewFactory( Type, - createDefaultConfig, - processor.WithMetrics(createMetricsProcessor, MetricsStability), - processor.WithLogs(createLogsProcessor, LogsStability), - processor.WithTraces(createTracesProcessor, TracesStability), + f.createDefaultConfig, + processor.WithMetrics(f.createMetricsProcessor, MetricsStability), + processor.WithLogs(f.createLogsProcessor, LogsStability), + processor.WithTraces(f.createTracesProcessor, TracesStability), ) } -func createDefaultConfig() component.Config { - return &Config{} +func (f *factory) createDefaultConfig() component.Config { + return &Config{ + Cardinality: types.LowCardinality, + } } -func createMetricsProcessor( +func (f *factory) createMetricsProcessor( ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { - tep, err := newInfraAttributesMetricProcessor(set, cfg.(*Config)) + iap, err := newInfraAttributesMetricProcessor(set, cfg.(*Config), f.tagger) if err != nil { return nil, err } @@ -46,17 +59,17 @@ func createMetricsProcessor( set, cfg, nextConsumer, - tep.processMetrics, + iap.processMetrics, processorhelper.WithCapabilities(processorCapabilities)) } -func createLogsProcessor( +func (f *factory) createLogsProcessor( ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Logs, ) (processor.Logs, error) { - tep, err := newInfraAttributesLogsProcessor(set, cfg.(*Config)) + iap, err := newInfraAttributesLogsProcessor(set, cfg.(*Config)) if err != nil { return nil, err } @@ -65,17 +78,17 @@ func createLogsProcessor( set, cfg, nextConsumer, - tep.processLogs, + iap.processLogs, processorhelper.WithCapabilities(processorCapabilities)) } -func createTracesProcessor( +func (f *factory) createTracesProcessor( ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { - tep, err := newInfraAttributesSpanProcessor(set, cfg.(*Config)) + iap, err := newInfraAttributesSpanProcessor(set, cfg.(*Config)) if err != nil { return nil, err } @@ -84,6 +97,6 @@ func createTracesProcessor( set, cfg, nextConsumer, - tep.processTraces, + iap.processTraces, processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory_test.go index 0019d68fe9eef..d168a11b40922 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory_test.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/factory_test.go @@ -18,17 +18,23 @@ import ( "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor/processortest" + + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" ) func TestType(t *testing.T) { - factory := NewFactory() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + factory := NewFactory(fakeTagger) pType := factory.Type() assert.Equal(t, pType, Type) } func TestCreateDefaultConfig(t *testing.T) { - factory := NewFactory() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + factory := NewFactory(fakeTagger) cfg := factory.CreateDefaultConfig() assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } @@ -50,10 +56,12 @@ func TestCreateProcessors(t *testing.T) { t.Run(tt.configName, func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configName)) require.NoError(t, err) + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() for k := range cm.ToStringMap() { // Check if all processor variations that are defined in test config can be actually created - factory := NewFactory() + factory := NewFactory(fakeTagger) cfg := factory.CreateDefaultConfig() sub, err := cm.Sub(k) diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs.go index 577c3ce5015ac..ad326b36c8d80 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs.go @@ -22,7 +22,7 @@ func newInfraAttributesLogsProcessor(set processor.CreateSettings, _ *Config) (* logger: set.Logger, } - set.Logger.Info("Logs Tag Enrichment configured") + set.Logger.Info("Logs Infra Attributes Processor configured") return telp, nil } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs_test.go index f0404fd588663..7775262b101a7 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs_test.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/logs_test.go @@ -14,6 +14,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor/processortest" + + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" ) type logNameTest struct { @@ -40,7 +42,9 @@ func TestInfraAttributesLogProcessor(t *testing.T) { cfg := &Config{ Logs: LogInfraAttributes{}, } - factory := NewFactory() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + factory := NewFactory(fakeTagger) flp, err := factory.CreateLogsProcessor( context.Background(), processortest.NewNopCreateSettings(), diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metadata.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metadata.go index bf8eba0ad92d0..18eeafea7ed2c 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metadata.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metadata.go @@ -12,7 +12,7 @@ import ( ) var ( - // Type for tag enrichment processor. + // Type for infra attributes processor. Type = component.MustNewType("infraattributes") ) @@ -25,12 +25,12 @@ const ( LogsStability = component.StabilityLevelAlpha ) -// Meter for tag enrichement. +// Meter for infra attributes processor. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("otelcol/infraattributes") } -// Tracer for tag enrichment. +// Tracer for infra attributes processor. func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/infraattributes") } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics.go index 63e09edd7d892..95a834b84f10c 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics.go @@ -7,24 +7,120 @@ package infraattributesprocessor import ( "context" + "fmt" + "strings" + "github.com/DataDog/datadog-agent/comp/core/tagger" + "github.com/DataDog/datadog-agent/comp/core/tagger/types" + + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" + conventions "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" ) type infraAttributesMetricProcessor struct { - logger *zap.Logger + logger *zap.Logger + tagger tagger.Component + cardinality types.TagCardinality } -func newInfraAttributesMetricProcessor(set processor.CreateSettings, _ *Config) (*infraAttributesMetricProcessor, error) { - tesp := &infraAttributesMetricProcessor{ - logger: set.Logger, +func newInfraAttributesMetricProcessor(set processor.CreateSettings, cfg *Config, tagger tagger.Component) (*infraAttributesMetricProcessor, error) { + iamp := &infraAttributesMetricProcessor{ + logger: set.Logger, + tagger: tagger, + cardinality: cfg.Cardinality, } - set.Logger.Info("Metric Tag Enrichment configured") - return tesp, nil + set.Logger.Info("Metric Infra Attributes Processor configured") + return iamp, nil } -func (temp *infraAttributesMetricProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { +// TODO: Replace OriginIDFromAttributes in opentelemetry-mapping-go with this method +// entityIDsFromAttributes gets the entity IDs from resource attributes. +// If not found, an empty string slice is returned. +func entityIDsFromAttributes(attrs pcommon.Map) []string { + entityIDs := make([]string, 0, 8) + // Prefixes come from pkg/util/kubernetes/kubelet and pkg/util/containers. + if containerID, ok := attrs.Get(conventions.AttributeContainerID); ok { + entityIDs = append(entityIDs, fmt.Sprintf("container_id://%v", containerID.AsString())) + } + if containerImageID, ok := attrs.Get(conventions.AttributeContainerImageID); ok { + splitImageID := strings.SplitN(containerImageID.AsString(), "@sha256:", 2) + if len(splitImageID) == 2 { + entityIDs = append(entityIDs, fmt.Sprintf("container_image_metadata://sha256:%v", splitImageID[1])) + } + } + if ecsTaskArn, ok := attrs.Get(conventions.AttributeAWSECSTaskARN); ok { + entityIDs = append(entityIDs, fmt.Sprintf("ecs_task://%v", ecsTaskArn.AsString())) + } + if deploymentName, ok := attrs.Get(conventions.AttributeK8SDeploymentName); ok { + namespace, namespaceOk := attrs.Get(conventions.AttributeK8SNamespaceName) + if namespaceOk { + entityIDs = append(entityIDs, fmt.Sprintf("deployment://%v/%v", namespace.AsString(), deploymentName.AsString())) + } + } + if namespace, ok := attrs.Get(conventions.AttributeK8SNamespaceName); ok { + entityIDs = append(entityIDs, fmt.Sprintf("namespace://%v", namespace.AsString())) + } + if nodeUID, ok := attrs.Get(conventions.AttributeK8SNodeUID); ok { + entityIDs = append(entityIDs, fmt.Sprintf("kubernetes_node_uid://%v", nodeUID.AsString())) + } + if podUID, ok := attrs.Get(conventions.AttributeK8SPodUID); ok { + entityIDs = append(entityIDs, fmt.Sprintf("kubernetes_pod_uid://%v", podUID.AsString())) + } + if processPid, ok := attrs.Get(conventions.AttributeProcessPID); ok { + entityIDs = append(entityIDs, fmt.Sprintf("process://%v", processPid.AsString())) + } + return entityIDs +} + +func splitTag(tag string) (key string, value string) { + split := strings.SplitN(tag, ":", 2) + if len(split) < 2 || split[0] == "" || split[1] == "" { + return "", "" + } + return split[0], split[1] +} + +func (iamp *infraAttributesMetricProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + resourceAttributes := rms.At(i).Resource().Attributes() + entityIDs := entityIDsFromAttributes(resourceAttributes) + tagMap := make(map[string]string) + + // Get all unique tags from resource attributes and global tags + for _, entityID := range entityIDs { + entityTags, err := iamp.tagger.Tag(entityID, iamp.cardinality) + if err != nil { + iamp.logger.Error("Cannot get tags for entity", zap.String("entityID", entityID), zap.Error(err)) + continue + } + for _, tag := range entityTags { + k, v := splitTag(tag) + _, hasTag := tagMap[k] + if k != "" && v != "" && !hasTag { + tagMap[k] = v + } + } + } + globalTags, err := iamp.tagger.GlobalTags(iamp.cardinality) + if err != nil { + iamp.logger.Error("Cannot get global tags", zap.Error(err)) + } + for _, tag := range globalTags { + k, v := splitTag(tag) + _, hasTag := tagMap[k] + if k != "" && v != "" && !hasTag { + tagMap[k] = v + } + } + + // Add all tags as resource attributes + for k, v := range tagMap { + resourceAttributes.PutStr(k, v) + } + } return md, nil } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics_test.go index c6adb8492a7aa..50892d20c94a3 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics_test.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/metrics_test.go @@ -11,13 +11,20 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor/processortest" + conventions "go.opentelemetry.io/collector/semconv/v1.21.0" + + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/collectors" + "github.com/DataDog/datadog-agent/comp/core/tagger/types" ) type metricNameTest struct { - name string - inMetrics pmetric.Metrics + name string + inMetrics pmetric.Metrics + outResourceAttributes []map[string]any } type metricWithResource struct { @@ -32,20 +39,101 @@ var ( standardTests = []metricNameTest{ { - name: "includeFilter", - inMetrics: testResourceMetrics([]metricWithResource{{metricNames: inMetricNames}}), + name: "one tag with global", + inMetrics: testResourceMetrics([]metricWithResource{{ + metricNames: inMetricNames, + resourceAttributes: map[string]any{ + "container.id": "test", + }, + }}), + outResourceAttributes: []map[string]any{{ + "global": "tag", + "container.id": "test", + "container": "id", + }}, + }, + { + name: "two tags with global", + inMetrics: testResourceMetrics([]metricWithResource{{ + metricNames: inMetricNames, + resourceAttributes: map[string]any{ + "container.id": "test", + "k8s.namespace.name": "namespace", + "k8s.deployment.name": "deployment", + }, + }}), + outResourceAttributes: []map[string]any{{ + "global": "tag", + "container.id": "test", + "k8s.namespace.name": "namespace", + "k8s.deployment.name": "deployment", + "container": "id", + "deployment": "name", + }}, + }, + { + name: "two resource metrics, two tags with global", + inMetrics: testResourceMetrics([]metricWithResource{ + { + metricNames: inMetricNames, + resourceAttributes: map[string]any{ + "container.id": "test", + }, + }, + { + metricNames: inMetricNames, + resourceAttributes: map[string]any{ + "k8s.namespace.name": "namespace", + "k8s.deployment.name": "deployment", + }, + }}), + outResourceAttributes: []map[string]any{ + { + "global": "tag", + "container.id": "test", + "container": "id", + }, + { + "global": "tag", + "k8s.namespace.name": "namespace", + "k8s.deployment.name": "deployment", + "deployment": "name", + }, + }, }, } ) +func testResourceMetrics(mwrs []metricWithResource) pmetric.Metrics { + md := pmetric.NewMetrics() + + for _, mwr := range mwrs { + rm := md.ResourceMetrics().AppendEmpty() + //nolint:errcheck + rm.Resource().Attributes().FromRaw(mwr.resourceAttributes) + ms := rm.ScopeMetrics().AppendEmpty().Metrics() + for _, name := range mwr.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + } + } + return md +} + func TestInfraAttributesMetricProcessor(t *testing.T) { for _, test := range standardTests { t.Run(test.name, func(t *testing.T) { next := new(consumertest.MetricsSink) cfg := &Config{ - Metrics: MetricInfraAttributes{}, + Metrics: MetricInfraAttributes{}, + Cardinality: types.LowCardinality, } - factory := NewFactory() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + fakeTagger.SetTags("container_id://test", "test", []string{"container:id"}, nil, nil, nil) + fakeTagger.SetTags("deployment://namespace/deployment", "test", []string{"deployment:name"}, nil, nil, nil) + fakeTagger.SetTags(collectors.GlobalEntityID, "test", []string{"global:tag"}, nil, nil, nil) + factory := NewFactory(fakeTagger) fmp, err := factory.CreateMetricsProcessor( context.Background(), processortest.NewNopCreateSettings(), @@ -63,22 +151,124 @@ func TestInfraAttributesMetricProcessor(t *testing.T) { cErr := fmp.ConsumeMetrics(context.Background(), test.inMetrics) assert.Nil(t, cErr) assert.NoError(t, fmp.Shutdown(ctx)) + + assert.Len(t, next.AllMetrics(), 1) + for i, out := range test.outResourceAttributes { + rms := next.AllMetrics()[0].ResourceMetrics().At(i) + assert.NotNil(t, rms) + assert.EqualValues(t, out, rms.Resource().Attributes().AsRaw()) + } }) } } -func testResourceMetrics(mwrs []metricWithResource) pmetric.Metrics { - md := pmetric.NewMetrics() +func TestEntityIDsFromAttributes(t *testing.T) { + tests := []struct { + name string + attrs pcommon.Map + entityIDs []string + }{ + { + name: "none", + attrs: pcommon.NewMap(), + entityIDs: []string{}, + }, + { + name: "pod UID and container ID", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeContainerID: "container_id_goes_here", + conventions.AttributeK8SPodUID: "k8s_pod_uid_goes_here", + }) + return attributes + }(), + entityIDs: []string{"container_id://container_id_goes_here", "kubernetes_pod_uid://k8s_pod_uid_goes_here"}, + }, + { + name: "container image ID", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeContainerImageID: "docker.io/foo@sha256:sha_goes_here", + }) + return attributes + }(), + entityIDs: []string{"container_image_metadata://sha256:sha_goes_here"}, + }, + { + name: "ecs task arn", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeAWSECSTaskARN: "ecs_task_arn_goes_here", + }) + return attributes + }(), + entityIDs: []string{"ecs_task://ecs_task_arn_goes_here"}, + }, + { + name: "only deployment name without namespace", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeK8SDeploymentName: "k8s_deployment_name_goes_here", + }) + return attributes + }(), + entityIDs: []string{}, + }, + { + name: "deployment name and namespace", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeK8SDeploymentName: "k8s_deployment_name_goes_here", + conventions.AttributeK8SNamespaceName: "k8s_namespace_goes_here", + }) + return attributes + }(), + entityIDs: []string{"deployment://k8s_namespace_goes_here/k8s_deployment_name_goes_here", "namespace://k8s_namespace_goes_here"}, + }, + { + name: "only namespace name", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeK8SNamespaceName: "k8s_namespace_goes_here", + }) + return attributes + }(), + entityIDs: []string{"namespace://k8s_namespace_goes_here"}, + }, + { + name: "only node UID", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeK8SNodeUID: "k8s_node_uid_goes_here", + }) + return attributes + }(), + entityIDs: []string{"kubernetes_node_uid://k8s_node_uid_goes_here"}, + }, + { + name: "only process pid", + attrs: func() pcommon.Map { + attributes := pcommon.NewMap() + attributes.FromRaw(map[string]interface{}{ + conventions.AttributeProcessPID: "process_pid_goes_here", + }) + return attributes + }(), + entityIDs: []string{"process://process_pid_goes_here"}, + }, + } - for _, mwr := range mwrs { - rm := md.ResourceMetrics().AppendEmpty() - //nolint:errcheck - rm.Resource().Attributes().FromRaw(mwr.resourceAttributes) - ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for _, name := range mwr.metricNames { - m := ms.AppendEmpty() - m.SetName(name) - } + for _, testInstance := range tests { + t.Run(testInstance.name, func(t *testing.T) { + entityIDs := entityIDsFromAttributes(testInstance.attrs) + assert.Equal(t, testInstance.entityIDs, entityIDs) + }) } - return md } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/package_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/package_test.go deleted file mode 100644 index b95a7add774bf..0000000000000 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/package_test.go +++ /dev/null @@ -1,16 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2024-present Datadog, Inc. - -package infraattributesprocessor - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces.go index 6a3134833165e..857a7c482c708 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces.go @@ -21,7 +21,7 @@ func newInfraAttributesSpanProcessor(set processor.CreateSettings, _ *Config) (* tesp := &infraAttributesSpanProcessor{ logger: set.Logger, } - set.Logger.Info("Span Tag Enrichment configured") + set.Logger.Info("Span Infra Attributes Processor configured") return tesp, nil } diff --git a/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces_test.go b/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces_test.go index f52767228d5ab..aebef2f510f60 100644 --- a/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces_test.go +++ b/comp/otelcol/otlp/components/processor/infraattributesprocessor/traces_test.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor/processortest" + + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" ) // All the data we need to test the Span @@ -51,7 +53,9 @@ func TestInfraAttributesTraceProcessor(t *testing.T) { ctx := context.Background() next := new(consumertest.TracesSink) cfg := &Config{} - factory := NewFactory() + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + factory := NewFactory(fakeTagger) fmp, err := factory.CreateTracesProcessor( ctx, processortest.NewNopCreateSettings(), diff --git a/comp/otelcol/otlp/map_provider_not_serverless_test.go b/comp/otelcol/otlp/map_provider_not_serverless_test.go index 14c18fcf4f3d1..55609fb5eed90 100644 --- a/comp/otelcol/otlp/map_provider_not_serverless_test.go +++ b/comp/otelcol/otlp/map_provider_not_serverless_test.go @@ -12,13 +12,13 @@ import ( "context" "testing" - "github.com/DataDog/datadog-agent/pkg/logs/message" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/confmap" + "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl" "github.com/DataDog/datadog-agent/comp/otelcol/otlp/testutil" + "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/serializer" ) @@ -1112,7 +1112,9 @@ func TestUnmarshal(t *testing.T) { }, }) require.NoError(t, err) - components, err := getComponents(&serializer.MockSerializer{}, make(chan *message.Message)) + fakeTagger := taggerimpl.SetupFakeTagger(t) + defer fakeTagger.ResetTagger() + components, err := getComponents(&serializer.MockSerializer{}, make(chan *message.Message), fakeTagger) require.NoError(t, err) _, err = provider.Get(context.Background(), components) diff --git a/go.mod b/go.mod index 1881cf5256056..b795241df28a6 100644 --- a/go.mod +++ b/go.mod @@ -139,7 +139,7 @@ require ( github.com/DataDog/ebpf-manager v0.6.0 github.com/DataDog/gopsutil v1.2.2 github.com/DataDog/nikos v1.12.4 - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0 + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.16.0 github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.16.0 github.com/DataDog/sketches-go v1.4.4 @@ -553,7 +553,7 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/consumer v0.100.0 go.opentelemetry.io/collector/featuregate v1.7.0 - go.opentelemetry.io/collector/semconv v0.100.0 // indirect + go.opentelemetry.io/collector/semconv v0.100.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.26.0 // indirect go.opentelemetry.io/otel v1.26.0 // indirect @@ -718,7 +718,6 @@ require ( go.opentelemetry.io/collector/otelcol v0.100.0 go.opentelemetry.io/collector/processor v0.100.0 go.opentelemetry.io/collector/service v0.100.0 - go.uber.org/goleak v1.3.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb go4.org/mem v0.0.0-20220726221520-4f986261bf13 gotest.tools v2.2.0+incompatible diff --git a/go.sum b/go.sum index 6c02f96394e00..aea8c3125abbf 100644 --- a/go.sum +++ b/go.sum @@ -787,8 +787,8 @@ github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata v0.16.0 h1:Jl7/oQQ github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata v0.16.0/go.mod h1:P/l++2cDCeeq21KSmCEdXdMH9/WMdXP7uA/vjnxhtz8= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.16.0 h1:VJT1Jjlz/ca999FEqaAS+He7S4eB14a+PJjczgRdgAY= github.com/DataDog/opentelemetry-mapping-go/pkg/internal/sketchtest v0.16.0/go.mod h1:66XlN7QpQKqIvw8e2UbCXV5X8wGnEw851nT9BjJ75dY= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0 h1:g/ztrLYZNfkpW6Bt8kMnLed5DaKRHEtiKE0opHXLHJk= -github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.0/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1 h1:ZI8u3CgdMXpDplrf9/gIr13+/g/tUzUcBMk2ZhXgzLE= +github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.16.1/go.mod h1:dvIWN9pA2zWNTw5rhDWZgzZnhcfpH++d+8d1SWW6xkY= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.16.0 h1:NbKlfbjR2joF52jEBLs3MEnT6l5zM3MCyhUFkqARZpk= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs v0.16.0/go.mod h1:+LijQ2LdlocAQ4WB+7KsoIGe90bfogkRslubd9swVow= github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.16.0 h1:H5DzD3rwgQCX0VI3A16KgsdmC5grUCyDFflaZDpfgMc= diff --git a/pkg/serverless/otlp/otlp.go b/pkg/serverless/otlp/otlp.go index b0e11539d6408..bf03f7ca40b29 100644 --- a/pkg/serverless/otlp/otlp.go +++ b/pkg/serverless/otlp/otlp.go @@ -29,7 +29,7 @@ type ServerlessOTLPAgent struct { // NewServerlessOTLPAgent creates a new ServerlessOTLPAgent with the correct // otel pipeline. func NewServerlessOTLPAgent(serializer serializer.MetricSerializer) *ServerlessOTLPAgent { - pipeline, err := coreOtlp.NewPipelineFromAgentConfig(config.Datadog(), serializer, nil) + pipeline, err := coreOtlp.NewPipelineFromAgentConfig(config.Datadog(), serializer, nil, nil) if err != nil { log.Error("Error creating new otlp pipeline:", err) return nil