diff --git a/.chloggen/hints.yaml b/.chloggen/hints.yaml new file mode 100644 index 000000000000..6fb21310cfa2 --- /dev/null +++ b/.chloggen/hints.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for generating metrics receivers based on provided annotations' hints + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34427] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index d0af39c09a12..65d140f32820 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -437,3 +437,144 @@ service: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). + + +## Generate receiver configurations from provided Hints + +Currently this feature is only supported for K8s environments and the `k8sobserver`. + +The feature for K8s is enabled with the following setting: + +```yaml +receiver_creator/metrics: + watch_observers: [ k8s_observer ] + hints: + k8s: + metrics: + enabled: true +``` + +Users can use the following annotations to automatically enable receivers to start collecting metrics from the target Pods/containers. + +### Supported metrics annotations +1. `io.opentelemetry.collector.receiver-creator.metrics/receiver` (example: `nginx`) +2. `io.opentelemetry.collector.receiver-creator.metrics/endpoint` (example: ```"http://`endpoint`/nginx_status"```, if not provided it defaults to `endpoint` which is of form `pod_ip:container_port`.) +3. `io.opentelemetry.collector.receiver-creator.metrics/collection_interval` (example: `20s`) +4. `io.opentelemetry.collector.receiver-creator.metrics/timeout` (example: `1m`) +5. `io.opentelemetry.collector.receiver-creator.metrics/username` (example: `admin`) +6. `io.opentelemetry.collector.receiver-creator.metrics/password` (example: `passpass`) + + +### Support multiple target containers + +Users can target the annotation to a specific container by suffixing it with the name of the port that container exposes: +`io.opentelemetry.collector.receiver-creator.metrics./endpoint`. +For example ```io.opentelemetry.collector.receiver-creator.metrics.webserver/endpoint: "http://`endpoint`/nginx_status"``` +where `webserver` is the name of the port the target container exposes. + +If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and +the Pod level hints are used as a fallback (see detailed example bellow). + +The current implementation relies on the implementation of `k8sobserver` extension and specifically +the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go). +The hints are evaluated per container by extracting the annotations from each `Port` endpoint that is emitted. + + +### Examples + +#### Metrics example + +Collector's configuration: +```yaml +receivers: + receiver_creator/metrics: + watch_observers: [ k8s_observer ] + hints: + k8s: + metrics: + enabled: true + receivers: + +service: + extensions: [ k8s_observer] + pipelines: + metrics: + receivers: [ receiver_creator/metrics ] + processors: [] + exporters: [ debug ] +``` + +Target Pod annotated with hints: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: nginx-conf +data: + nginx.conf: | + user nginx; + worker_processes 1; + error_log /dev/stderr warn; + pid /var/run/nginx.pid; + events { + worker_connections 1024; + } + http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + access_log /dev/stdout main; + server { + listen 80; + server_name localhost; + + location /nginx_status { + stub_status on; + } + } + include /etc/nginx/conf.d/*; + } +--- +apiVersion: v1 +kind: Pod +metadata: + name: redis + annotations: + io.opentelemetry.collector.receiver-creator.metrics/receiver: redis + io.opentelemetry.collector.receiver-creator.metrics/collection_interval: '20s' + io.opentelemetry.collector.receiver-creator.metrics.webserver/receiver: nginx + io.opentelemetry.collector.receiver-creator.metrics.webserver/endpoint: "http://`endpoint`/nginx_status" + labels: + k8s-app: redis + app: redis +spec: + volumes: + - name: nginx-conf + configMap: + name: nginx-conf + items: + - key: nginx.conf + path: nginx.conf + containers: + - name: webserver + image: nginx:latest + ports: + - containerPort: 80 + name: webserver + volumeMounts: + - mountPath: /etc/nginx/nginx.conf + readOnly: true + subPath: nginx.conf + name: nginx-conf + - image: redis + imagePullPolicy: IfNotPresent + name: redis + ports: + - name: redis + containerPort: 6379 + protocol: TCP +``` \ No newline at end of file diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index bb5ebfaa4f6f..8b79b5d0cd70 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -78,6 +78,19 @@ type Config struct { // ResourceAttributes is a map of default resource attributes to add to each resource // object received by this receiver from dynamically created receivers. ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"` + Hints HintsConfig `mapstructure:"hints"` +} + +type HintsConfig struct { + K8s K8sHintsConfig `mapstructure:"k8s"` +} + +type K8sHintsConfig struct { + Metrics MetricsHints `mapstructure:"metrics"` +} + +type MetricsHints struct { + Enabled bool `mapstructure:"enabled"` } func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { diff --git a/receiver/receivercreator/hints.go b/receiver/receivercreator/hints.go new file mode 100644 index 000000000000..db95c57ee26f --- /dev/null +++ b/receiver/receivercreator/hints.go @@ -0,0 +1,165 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +const ( + otelHints = "io.opentelemetry.collector.receiver-creator" + metricsHint = "metrics" + hintsMetricsReceiver = "receiver" + hintsMetricsEndpoint = "endpoint" + hintsMetricsCollectionInterval = "collection_interval" + hintsMetricsTimeout = "timeout" + hintsMetricsUsername = "username" + hintsMetricsPassword = "password" +) + +// HintsTemplatesBuilder creates configuration templates from provided hints. +type HintsTemplatesBuilder interface { + createReceiverTemplatesFromHints() ([]receiverTemplate, error) +} + +// K8sHintsBuilder creates configurations from hints provided as Pod's annotations. +type K8sHintsBuilder struct { + logger *zap.Logger + config K8sHintsConfig +} + +// createReceiverTemplateFromHints creates a receiver configuration based on the provided hints. +// Hints are extracted from Pod's annotations. +// Metrics configurations are only created for Port Endpoints. +// TODO: Logs configurations are only created for Pod Container Endpoints. +func (builder *K8sHintsBuilder) createReceiverTemplateFromHints(env observer.EndpointEnv) (*receiverTemplate, error) { + var endpointType string + var podUID string + var annotations map[string]string + + builder.logger.Debug("handling hints for added endpoint", zap.Any("env", env)) + + if pod, ok := env["pod"]; ok { + endpointPod, ok := pod.(observer.EndpointEnv) + if !ok { + return nil, fmt.Errorf("could not extract endpoint's pod: %v", zap.Any("endpointPod", pod)) + } + ann := endpointPod["annotations"] + if ann != nil { + annotations, ok = ann.(map[string]string) + if !ok { + return nil, fmt.Errorf("could not extract annotations: %v", zap.Any("annotations", ann)) + } + } + podUID = endpointPod["uid"].(string) + } else { + return nil, nil + } + + if valType, ok := env["type"]; ok { + endpointType, ok = valType.(string) + if !ok { + return nil, fmt.Errorf("could not extract endpointType: %v", zap.Any("endpointType", valType)) + } + } else { + return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env)) + } + + if len(annotations) > 0 { + if endpointType == string(observer.PortType) && builder.config.Metrics.Enabled { + // Only handle Endpoints of type port for metrics + return builder.createMetricsReceiver(annotations, env, podUID) + } + } + return nil, nil +} + +func (builder *K8sHintsBuilder) createMetricsReceiver( + annotations map[string]string, + env observer.EndpointEnv, + podUID string) (*receiverTemplate, error) { + + var port uint16 + + portName := env["name"].(string) + subreceiverKey := getHintAnnotation(annotations, metricsHint, hintsMetricsReceiver, portName) + + if subreceiverKey == "" { + // no metrics hints detected + return nil, nil + } + builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey)) + + userConfMap := createMetricsConfig(annotations, env, portName) + + if p, ok := env["port"]; ok { + port = p.(uint16) + if port == 0 { + return nil, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + } else { + return nil, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + subreceiver, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, podUID, port), userConfMap) + if err != nil { + builder.logger.Error("error adding subreceiver", zap.Any("err", err)) + return nil, err + } + + builder.logger.Debug("adding hinted receiver", zap.Any("subreceiver", subreceiver)) + return &subreceiver, nil + +} + +func createMetricsConfig(annotations map[string]string, env observer.EndpointEnv, portName string) userConfigMap { + confMap := map[string]any{} + + defaultEndpoint := env["endpoint"] + // get endpoint directly from the Port endpoint + if defaultEndpoint != "" { + confMap["endpoint"] = defaultEndpoint + } + + subreceiverEndpoint := getHintAnnotation(annotations, metricsHint, hintsMetricsEndpoint, portName) + if subreceiverEndpoint != "" { + confMap["endpoint"] = subreceiverEndpoint + } + subreceiverColInterval := getHintAnnotation(annotations, metricsHint, hintsMetricsCollectionInterval, portName) + if subreceiverColInterval != "" { + confMap["collection_interval"] = subreceiverColInterval + } + subreceiverTimeout := getHintAnnotation(annotations, metricsHint, hintsMetricsTimeout, portName) + if subreceiverTimeout != "" { + confMap["timeout"] = subreceiverTimeout + } + subreceiverUsername := getHintAnnotation(annotations, metricsHint, hintsMetricsUsername, portName) + if subreceiverUsername != "" { + confMap["username"] = subreceiverUsername + } + subreceiverPassword := getHintAnnotation(annotations, metricsHint, hintsMetricsPassword, portName) + if subreceiverPassword != "" { + confMap["password"] = subreceiverPassword + } + return confMap +} + +func getHintAnnotation(annotations map[string]string, hintType string, hintKey string, suffix string) string { + // try to scope the hint more on container level by suffixing with . + containerLevelHint := annotations[fmt.Sprintf("%s.%s.%s/%s", otelHints, hintType, suffix, hintKey)] + if containerLevelHint != "" { + return containerLevelHint + } + + // if there is no container level hint defined try to use the Pod level hint + podHintKey := fmt.Sprintf("%s.%s/%s", otelHints, hintType, hintKey) + podLevelHint := annotations[podHintKey] + if podLevelHint != "" { + return podLevelHint + } + return "" +} diff --git a/receiver/receivercreator/hints_test.go b/receiver/receivercreator/hints_test.go new file mode 100644 index 000000000000..20cb43597785 --- /dev/null +++ b/receiver/receivercreator/hints_test.go @@ -0,0 +1,168 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func TestK8sHintsBuilderMetrics(t *testing.T) { + logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + logger.Level() + + id := component.ID{} + err := id.UnmarshalText([]byte("redis/pod-2-UID_6379")) + assert.NoError(t, err) + + tests := map[string]struct { + inputEndpoint observer.Endpoint + expectedReceiver receiverTemplate + wantError bool + }{ + `metrics_pod_level_hints_only`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_container_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics.redis/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics.redis/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics.redis/password": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_mix_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics.redis/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/timeout": "130s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics.redis/password": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "130s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_no_port_error`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics.redis/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/timeout": "130s", + "io.opentelemetry.collector.receiver-creator.metrics.redis/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics.redis/password": "changeme", + }}}, + }, + expectedReceiver: receiverTemplate{}, + wantError: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + k8sHintsBuilder := K8sHintsBuilder{logger, K8sHintsConfig{Metrics: MetricsHints{Enabled: true}}} + env, err := test.inputEndpoint.Env() + require.NoError(t, err) + subreceiverTemplate, err := k8sHintsBuilder.createReceiverTemplateFromHints(env) + if !test.wantError { + require.NoError(t, err) + require.Equal(t, subreceiverTemplate.receiverConfig.config, test.expectedReceiver.receiverConfig.config) + require.Equal(t, subreceiverTemplate.id, test.expectedReceiver.id) + } else { + require.Error(t, err) + } + }) + } +} + +func TestGetHintAnnotation(t *testing.T) { + metricsHintsAnn := map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver": "redis", + } + assert.Equal( + t, + "redis", + getHintAnnotation(metricsHintsAnn, metricsHint, hintsMetricsReceiver, "webport"), + ) + logsHintsAnn := map[string]string{ + "io.opentelemetry.collector.receiver-creator.logs/receiver": "redis", + } + assert.Equal( + t, + "redis", + getHintAnnotation(logsHintsAnn, "logs", "receiver", "webport"), + ) +} diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go index de1d9689953b..ae36ed6fe79e 100644 --- a/receiver/receivercreator/observerhandler.go +++ b/receiver/receivercreator/observerhandler.go @@ -83,85 +83,28 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { continue } - obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env)) - - for _, template := range obs.config.receiverTemplates { - if matches, e := template.rule.eval(env); e != nil { - obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(e)) - continue - } else if !matches { - continue - } - - obs.params.TelemetrySettings.Logger.Info("starting receiver", - zap.String("name", template.id.String()), - zap.String("endpoint", e.Target), - zap.String("endpoint_id", string(e.ID))) - - resolvedConfig, err := expandConfig(template.config, env) + if obs.config.Hints.K8s.Metrics.Enabled { + k8sHintsBuilder := K8sHintsBuilder{obs.params.TelemetrySettings.Logger, obs.config.Hints.K8s} + subreceiverTemplate, err := k8sHintsBuilder.createReceiverTemplateFromHints(env) if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue + obs.params.TelemetrySettings.Logger.Error("could not extract configurations from K8s hints' annotations", zap.Any("err", err)) + break } - obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig)) - - discoveredCfg := userConfigMap{} - // If user didn't set endpoint set to default value as well as - // flag indicating we've done this for later validation. - if _, ok := resolvedConfig[endpointConfigKey]; !ok { - discoveredCfg[endpointConfigKey] = e.Target - discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} - } - - // Though not necessary with contrib provided observers, nothing is stopping custom - // ones from using expr in their Target values. - discoveredConfig, err := expandConfig(discoveredCfg, env) - if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + if subreceiverTemplate != nil { + obs.params.TelemetrySettings.Logger.Debug("adding K8s hinted receiver", zap.Any("subreceiver", subreceiverTemplate)) + obs.startReceiver(*subreceiverTemplate, env, e) continue } + } - resAttrs := map[string]string{} - for k, v := range template.ResourceAttributes { - strVal, ok := v.(string) - if !ok { - obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) - continue - } - resAttrs[k] = strVal - } - - // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources - // as telemetry is emitted. - var consumer *enhancingConsumer - if consumer, err = newEnhancingConsumer( - obs.config.ResourceAttributes, - resAttrs, - env, - e, - obs.nextLogsConsumer, - obs.nextMetricsConsumer, - obs.nextTracesConsumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + for _, template := range obs.config.receiverTemplates { + if matches, err := template.rule.eval(env); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(err)) continue - } - - var receiver component.Component - if receiver, err = obs.runner.start( - receiverConfig{ - id: template.id, - config: resolvedConfig, - endpointID: e.ID, - }, - discoveredConfig, - consumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + } else if !matches { continue } - - obs.receiversByEndpointID.Put(e.ID, receiver) + obs.startReceiver(template, env, e) } } } @@ -200,3 +143,74 @@ func (obs *observerHandler) OnChange(changed []observer.Endpoint) { obs.OnRemove(changed) obs.OnAdd(changed) } + +func (obs *observerHandler) startReceiver(template receiverTemplate, env observer.EndpointEnv, e observer.Endpoint) { + obs.params.TelemetrySettings.Logger.Info("starting receiver", + zap.String("name", template.id.String()), + zap.String("endpoint", e.Target), + zap.String("endpoint_id", string(e.ID)), + zap.Any("config", template.config)) + + resolvedConfig, err := expandConfig(template.config, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + discoveredCfg := userConfigMap{} + // If user didn't set endpoint set to default value as well as + // flag indicating we've done this for later validation. + if _, ok := resolvedConfig[endpointConfigKey]; !ok { + discoveredCfg[endpointConfigKey] = e.Target + discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + } + + // Though not necessary with contrib provided observers, nothing is stopping custom + // ones from using expr in their Target values. + discoveredConfig, err := expandConfig(discoveredCfg, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + resAttrs := map[string]string{} + for k, v := range template.ResourceAttributes { + strVal, ok := v.(string) + if !ok { + obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) + continue + } + resAttrs[k] = strVal + } + + // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources + // as telemetry is emitted. + var consumer *enhancingConsumer + if consumer, err = newEnhancingConsumer( + obs.config.ResourceAttributes, + resAttrs, + env, + e, + obs.nextLogsConsumer, + obs.nextMetricsConsumer, + obs.nextTracesConsumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + var receiver component.Component + if receiver, err = obs.runner.start( + receiverConfig{ + id: template.id, + config: resolvedConfig, + endpointID: e.ID, + }, + discoveredConfig, + consumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + obs.receiversByEndpointID.Put(e.ID, receiver) +}