diff --git a/.chloggen/hints.yaml b/.chloggen/hints.yaml new file mode 100644 index 000000000000..6fb21310cfa2 --- /dev/null +++ b/.chloggen/hints.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for generating metrics receivers based on provided annotations' hints + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34427] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index bb5ebfaa4f6f..8b79b5d0cd70 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -78,6 +78,19 @@ type Config struct { // ResourceAttributes is a map of default resource attributes to add to each resource // object received by this receiver from dynamically created receivers. ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"` + Hints HintsConfig `mapstructure:"hints"` +} + +type HintsConfig struct { + K8s K8sHintsConfig `mapstructure:"k8s"` +} + +type K8sHintsConfig struct { + Metrics MetricsHints `mapstructure:"metrics"` +} + +type MetricsHints struct { + Enabled bool `mapstructure:"enabled"` } func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { diff --git a/receiver/receivercreator/hints.go b/receiver/receivercreator/hints.go new file mode 100644 index 000000000000..e1fbebb6e1f6 --- /dev/null +++ b/receiver/receivercreator/hints.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +const ( + hintsMetricsReceiver = "io.opentelemetry.collector.receiver-creator.metrics/receiver" + hintsMetricsEndpoint = "io.opentelemetry.collector.receiver-creator.metrics/endpoint" + hintsMetricsCollectionInterval = "io.opentelemetry.collector.receiver-creator.metrics/collection_interval" + hintsMetricsTimeout = "io.opentelemetry.collector.receiver-creator.metrics/timeout" + hintsMetricsUsername = "io.opentelemetry.collector.receiver-creator.metrics/username" + hintsMetricsPassword = "io.opentelemetry.collector.receiver-creator.metrics/password" +) + +// HintsTemplatesBuilder creates configuration templates from provided hints. +type HintsTemplatesBuilder interface { + createReceiverTemplatesFromHints() ([]receiverTemplate, error) +} + +// K8sHintsBuilder creates configurations from hints provided as Pod's annotations. +type K8sHintsBuilder struct { + logger *zap.Logger +} + +func (builder *K8sHintsBuilder) createReceiverTemplatesFromHints(env observer.EndpointEnv) ([]receiverTemplate, error) { + var endpointType string + var podUID string + var port uint16 + var annotations map[string]string + var receiverTemplates []receiverTemplate + + builder.logger.Debug("handling hints for added endpoint", zap.Any("env", env)) + + if pod, ok := env["pod"]; !ok { + return receiverTemplates, nil + } else { + endpointPod, ok := pod.(observer.EndpointEnv) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract endpoint's pod: %v", zap.Any("endpointPod", pod)) + } + ann := endpointPod["annotations"] + if ann != nil { + annotations, ok = ann.(map[string]string) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract annotations: %v", zap.Any("annotations", ann)) + } + } + podUID = endpointPod["uid"].(string) + } + + if valType, ok := env["type"]; !ok { + return receiverTemplates, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env)) + } else { + endpointType, ok = valType.(string) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract endpointType: %v", zap.Any("endpointType", valType)) + } + } + + if len(annotations) > 0 { + if endpointType == string(observer.PortType) { + // Only handle Endpoints of type port for metrics + portName := env["name"].(string) + metricsReceiverEnabled := getHintAnnotation(annotations, hintsMetricsReceiver, portName) + if metricsReceiverEnabled != "" { + subreceiverKey := metricsReceiverEnabled + if subreceiverKey == "" { + return receiverTemplates, nil + } + builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey)) + + userConfMap := createMetricsConfig(annotations, env, portName) + + if p, ok := env["port"]; ok { + port = p.(uint16) + if port == 0 { + return receiverTemplates, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + } else { + return receiverTemplates, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + subreceiver, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, podUID, port), userConfMap) + if err != nil { + builder.logger.Error("error adding subreceiver", zap.Any("err", err)) + return receiverTemplates, err + } + + subreceiver.Rule = fmt.Sprintf("type == \"port\" && port ==%v", port) // + subreceiver.rule, err = newRule(subreceiver.Rule) + if err != nil { + builder.logger.Error("error adding subreceiver rule", zap.Any("err", err)) + return receiverTemplates, err + } + builder.logger.Debug("adding hinted receiver", zap.Any("subreceiver", subreceiver)) + receiverTemplates = append(receiverTemplates, subreceiver) + } + } + } + return receiverTemplates, nil +} + +func createMetricsConfig(annotations map[string]string, env observer.EndpointEnv, portName string) userConfigMap { + confMap := map[string]any{} + + defaultEndpoint := env["endpoint"] + // get endpoint directly from the Port endpoint + if defaultEndpoint != "" { + confMap["endpoint"] = defaultEndpoint + } + + subreceiverEndpoint := getHintAnnotation(annotations, hintsMetricsEndpoint, portName) + if subreceiverEndpoint != "" { + confMap["endpoint"] = subreceiverEndpoint + } + subreceiverColInterval := getHintAnnotation(annotations, hintsMetricsCollectionInterval, portName) + if subreceiverColInterval != "" { + confMap["collection_interval"] = subreceiverColInterval + } + subreceiverTimeout := getHintAnnotation(annotations, hintsMetricsTimeout, portName) + if subreceiverTimeout != "" { + confMap["timeout"] = subreceiverTimeout + } + subreceiverUsername := getHintAnnotation(annotations, hintsMetricsUsername, portName) + if subreceiverUsername != "" { + confMap["username"] = subreceiverUsername + } + subreceiverPassword := getHintAnnotation(annotations, hintsMetricsPassword, portName) + if subreceiverPassword != "" { + confMap["password"] = subreceiverPassword + } + return confMap +} + +func getHintAnnotation(annotations map[string]string, hintKey string, portName string) string { + containerLevelHint := annotations[fmt.Sprintf("%s.%s", hintKey, portName)] + if containerLevelHint != "" { + return containerLevelHint + } + + // if there is no container level hint defined try to scope the hint more on container level by suffixing with . + podLevelHint := annotations[hintKey] + if podLevelHint != "" { + return podLevelHint + } + return "" +} diff --git a/receiver/receivercreator/hints_test.go b/receiver/receivercreator/hints_test.go new file mode 100644 index 000000000000..17983f6b5585 --- /dev/null +++ b/receiver/receivercreator/hints_test.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func TestK8sHintsBuilderMetrics(t *testing.T) { + logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + logger.Level() + + tests := map[string]struct { + inputEndpoint observer.Endpoint + expectedReceiver receiverTemplate + wantError bool + }{ + `metrics_pod_level_hints_only`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_container_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval.redis": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_mix_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "130s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "130s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_no_port_error`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "130s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}}, + }, + expectedReceiver: receiverTemplate{}, + wantError: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + k8sHintsBuilder := K8sHintsBuilder{logger} + env, err := test.inputEndpoint.Env() + require.NoError(t, err) + subreceiverTemplates, err := k8sHintsBuilder.createReceiverTemplatesFromHints(env) + if !test.wantError { + require.NoError(t, err) + require.Equal(t, subreceiverTemplates[0].receiverConfig.config, test.expectedReceiver.receiverConfig.config) + require.Equal(t, subreceiverTemplates[0].Rule, test.expectedReceiver.Rule) + } else { + require.Error(t, err) + } + }) + } +} diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go index de1d9689953b..aecddbe664e1 100644 --- a/receiver/receivercreator/observerhandler.go +++ b/receiver/receivercreator/observerhandler.go @@ -83,85 +83,31 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { continue } - obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env)) - - for _, template := range obs.config.receiverTemplates { - if matches, e := template.rule.eval(env); e != nil { - obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(e)) - continue - } else if !matches { - continue - } - - obs.params.TelemetrySettings.Logger.Info("starting receiver", - zap.String("name", template.id.String()), - zap.String("endpoint", e.Target), - zap.String("endpoint_id", string(e.ID))) - - resolvedConfig, err := expandConfig(template.config, env) + if obs.config.Hints.K8s.Metrics.Enabled { + k8sHintsBuilder := K8sHintsBuilder{obs.params.TelemetrySettings.Logger} + subreceiverTemplates, err := k8sHintsBuilder.createReceiverTemplatesFromHints(env) if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue - } - obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig)) - - discoveredCfg := userConfigMap{} - // If user didn't set endpoint set to default value as well as - // flag indicating we've done this for later validation. - if _, ok := resolvedConfig[endpointConfigKey]; !ok { - discoveredCfg[endpointConfigKey] = e.Target - discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + obs.params.TelemetrySettings.Logger.Error("could not extract configurations from K8s hints' annotations", zap.Any("err", err)) + break } - - // Though not necessary with contrib provided observers, nothing is stopping custom - // ones from using expr in their Target values. - discoveredConfig, err := expandConfig(discoveredCfg, env) - if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue - } - - resAttrs := map[string]string{} - for k, v := range template.ResourceAttributes { - strVal, ok := v.(string) - if !ok { - obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) - continue + if len(subreceiverTemplates) > 0 { + // loop over the receiverTemplates. Some Pods might have both logs+metrics templates + for _, subreceiver := range subreceiverTemplates { + obs.params.TelemetrySettings.Logger.Debug("adding K8s hinted receiver", zap.Any("subreceiver", subreceiver)) + obs.startReceiver(subreceiver, env, e) } - resAttrs[k] = strVal - } - - // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources - // as telemetry is emitted. - var consumer *enhancingConsumer - if consumer, err = newEnhancingConsumer( - obs.config.ResourceAttributes, - resAttrs, - env, - e, - obs.nextLogsConsumer, - obs.nextMetricsConsumer, - obs.nextTracesConsumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) continue } + } - var receiver component.Component - if receiver, err = obs.runner.start( - receiverConfig{ - id: template.id, - config: resolvedConfig, - endpointID: e.ID, - }, - discoveredConfig, - consumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + for _, template := range obs.config.receiverTemplates { + if matches, err := template.rule.eval(env); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(err)) + continue + } else if !matches { continue } - - obs.receiversByEndpointID.Put(e.ID, receiver) + obs.startReceiver(template, env, e) } } } @@ -200,3 +146,74 @@ func (obs *observerHandler) OnChange(changed []observer.Endpoint) { obs.OnRemove(changed) obs.OnAdd(changed) } + +func (obs *observerHandler) startReceiver(template receiverTemplate, env observer.EndpointEnv, e observer.Endpoint) { + obs.params.TelemetrySettings.Logger.Info("starting receiver", + zap.String("name", template.id.String()), + zap.String("endpoint", e.Target), + zap.String("endpoint_id", string(e.ID)), + zap.Any("config", template.config)) + + resolvedConfig, err := expandConfig(template.config, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + discoveredCfg := userConfigMap{} + // If user didn't set endpoint set to default value as well as + // flag indicating we've done this for later validation. + if _, ok := resolvedConfig[endpointConfigKey]; !ok { + discoveredCfg[endpointConfigKey] = e.Target + discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + } + + // Though not necessary with contrib provided observers, nothing is stopping custom + // ones from using expr in their Target values. + discoveredConfig, err := expandConfig(discoveredCfg, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + resAttrs := map[string]string{} + for k, v := range template.ResourceAttributes { + strVal, ok := v.(string) + if !ok { + obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) + continue + } + resAttrs[k] = strVal + } + + // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources + // as telemetry is emitted. + var consumer *enhancingConsumer + if consumer, err = newEnhancingConsumer( + obs.config.ResourceAttributes, + resAttrs, + env, + e, + obs.nextLogsConsumer, + obs.nextMetricsConsumer, + obs.nextTracesConsumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + var receiver component.Component + if receiver, err = obs.runner.start( + receiverConfig{ + id: template.id, + config: resolvedConfig, + endpointID: e.ID, + }, + discoveredConfig, + consumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + obs.receiversByEndpointID.Put(e.ID, receiver) +}