diff --git a/.chloggen/otelallocator-flag.yaml b/.chloggen/otelallocator-flag.yaml new file mode 100755 index 0000000000..844f88bfb9 --- /dev/null +++ b/.chloggen/otelallocator-flag.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: operator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduces a new feature flag "`operator.collector.rewritetargetallocator`" that allows an operator to add the target_allocator configuration to the collector configuration + +# One or more tracking issues related to the change +issues: [1581] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Note that the ConfigToPromConfig function in pkg/targetallocator/adapters now correctly returns the prometheus receiver config + in accordance with its docstring. It used to erroneously return the actual Prometheus config from a level lower. diff --git a/README.md b/README.md index 35842343f4..74799b5d52 100644 --- a/README.md +++ b/README.md @@ -422,6 +422,64 @@ Note how the Operator added a `global` section and a new `http_sd_configs` to th More info on the TargetAllocator can be found [here](cmd/otel-allocator/README.md). +#### Target Allocator config rewriting + +Prometheus receiver now has explicit support for acquiring scrape targets from the target allocator. As such, it is now possible to have the +Operator add the necessary target allocator configuration automatically. This feature currently requires the `operator.collector.rewritetargetallocator` feature flag to be enabled. With the flag enabled, the configuration from the previous section would be rendered as: + +```yaml + receivers: + prometheus: + config: + global: + scrape_interval: 1m + scrape_timeout: 10s + evaluation_interval: 1m + target_allocator: + endpoint: http://collector-with-ta-targetallocator:80 + interval: 30s + collector_id: $POD_NAME + + exporters: + logging: + + service: + pipelines: + metrics: + receivers: [prometheus] + processors: [] + exporters: [logging] +``` + +This also allows for a more straightforward collector configuration for target discovery using prometheus-operator CRDs. See below for a minimal example: + +```yaml +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: collector-with-ta-prometheus-cr +spec: + mode: statefulset + targetAllocator: + enabled: true + serviceAccount: everything-prometheus-operator-needs + prometheusCR: + enabled: true + config: | + receivers: + prometheus: + + exporters: + logging: + + service: + pipelines: + metrics: + receivers: [prometheus] + processors: [] + exporters: [logging] +``` + ## Compatibility matrix ### OpenTelemetry Operator vs. OpenTelemetry Collector diff --git a/pkg/collector/reconcile/config_replace.go b/pkg/collector/reconcile/config_replace.go index 6ad5ea0984..b8b217ad97 100644 --- a/pkg/collector/reconcile/config_replace.go +++ b/pkg/collector/reconcile/config_replace.go @@ -17,8 +17,8 @@ package reconcile import ( "fmt" "net/url" + "time" - "github.com/mitchellh/mapstructure" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/http" @@ -27,12 +27,20 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters" + "github.com/open-telemetry/opentelemetry-operator/pkg/featuregate" "github.com/open-telemetry/opentelemetry-operator/pkg/naming" ta "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/adapters" ) +type targetAllocator struct { + Endpoint string `yaml:"endpoint"` + Interval time.Duration `yaml:"interval"` + CollectorID string `yaml:"collector_id"` +} + type Config struct { - PromConfig *promconfig.Config `yaml:"config"` + PromConfig *promconfig.Config `yaml:"config"` + TargetAllocConfig *targetAllocator `yaml:"target_allocator,omitempty"` } func ReplaceConfig(instance v1alpha1.OpenTelemetryCollector) (string, error) { @@ -50,9 +58,7 @@ func ReplaceConfig(instance v1alpha1.OpenTelemetryCollector) (string, error) { } // yaml marshaling/unsmarshaling is preferred because of the problems associated with the conversion of map to a struct using mapstructure - promCfg, marshalErr := yaml.Marshal(map[string]interface{}{ - "config": promCfgMap, - }) + promCfg, marshalErr := yaml.Marshal(promCfgMap) if marshalErr != nil { return "", marshalErr } @@ -71,13 +77,18 @@ func ReplaceConfig(instance v1alpha1.OpenTelemetryCollector) (string, error) { } } - updPromCfgMap := make(map[string]interface{}) - if err := mapstructure.Decode(cfg, &updPromCfgMap); err != nil { - return "", err + if featuregate.EnableTargetAllocatorRewrite.IsEnabled() { + cfg.TargetAllocConfig = &targetAllocator{ + Endpoint: fmt.Sprintf("http://%s:80", naming.TAService(instance)), + Interval: 30 * time.Second, + CollectorID: "${POD_NAME}", + } + // we don't need the scrape configs here anymore with target allocator enabled + cfg.PromConfig.ScrapeConfigs = []*promconfig.ScrapeConfig{} } // type coercion checks are handled in the ConfigToPromConfig method above - config["receivers"].(map[interface{}]interface{})["prometheus"].(map[interface{}]interface{})["config"] = updPromCfgMap["PromConfig"] + config["receivers"].(map[interface{}]interface{})["prometheus"] = cfg out, err := yaml.Marshal(config) if err != nil { diff --git a/pkg/collector/reconcile/config_replace_test.go b/pkg/collector/reconcile/config_replace_test.go index 3ccd43f079..2f8a3770d8 100644 --- a/pkg/collector/reconcile/config_replace_test.go +++ b/pkg/collector/reconcile/config_replace_test.go @@ -16,11 +16,15 @@ package reconcile import ( "testing" + "time" + + colfeaturegate "go.opentelemetry.io/collector/featuregate" "github.com/prometheus/prometheus/discovery/http" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" + "github.com/open-telemetry/opentelemetry-operator/pkg/featuregate" ta "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/adapters" ) @@ -37,9 +41,7 @@ func TestPrometheusParser(t *testing.T) { promCfgMap, err := ta.ConfigToPromConfig(actualConfig) assert.NoError(t, err) - promCfg, err := yaml.Marshal(map[string]interface{}{ - "config": promCfgMap, - }) + promCfg, err := yaml.Marshal(promCfgMap) assert.NoError(t, err) err = yaml.UnmarshalStrict(promCfg, &cfg) @@ -59,6 +61,37 @@ func TestPrometheusParser(t *testing.T) { for k := range expectedMap { assert.True(t, expectedMap[k], k) } + assert.True(t, cfg.TargetAllocConfig == nil) + }) + + t.Run("should update config with targetAllocator block", func(t *testing.T) { + err := colfeaturegate.GlobalRegistry().Set(featuregate.EnableTargetAllocatorRewrite.ID(), true) + param.Instance.Spec.TargetAllocator.Enabled = true + assert.NoError(t, err) + actualConfig, err := ReplaceConfig(param.Instance) + assert.NoError(t, err) + + // prepare + var cfg Config + promCfgMap, err := ta.ConfigToPromConfig(actualConfig) + assert.NoError(t, err) + + promCfg, err := yaml.Marshal(promCfgMap) + assert.NoError(t, err) + + err = yaml.UnmarshalStrict(promCfg, &cfg) + assert.NoError(t, err) + + // test + assert.Len(t, cfg.PromConfig.ScrapeConfigs, 0) + expectedTAConfig := &targetAllocator{ + Endpoint: "http://test-targetallocator:80", + Interval: 30 * time.Second, + CollectorID: "${POD_NAME}", + } + assert.Equal(t, expectedTAConfig, cfg.TargetAllocConfig) + err = colfeaturegate.GlobalRegistry().Set(featuregate.EnableTargetAllocatorRewrite.ID(), false) + assert.NoError(t, err) }) t.Run("should not update config with http_sd_config", func(t *testing.T) { @@ -71,9 +104,7 @@ func TestPrometheusParser(t *testing.T) { promCfgMap, err := ta.ConfigToPromConfig(actualConfig) assert.NoError(t, err) - promCfg, err := yaml.Marshal(map[string]interface{}{ - "config": promCfgMap, - }) + promCfg, err := yaml.Marshal(promCfgMap) assert.NoError(t, err) err = yaml.UnmarshalStrict(promCfg, &cfg) @@ -93,6 +124,7 @@ func TestPrometheusParser(t *testing.T) { for k := range expectedMap { assert.True(t, expectedMap[k], k) } + assert.True(t, cfg.TargetAllocConfig == nil) }) } diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go index b5e9906ed8..f906fe68a8 100644 --- a/pkg/collector/reconcile/configmap.go +++ b/pkg/collector/reconcile/configmap.go @@ -114,7 +114,8 @@ func desiredTAConfigMap(params Params) (corev1.ConfigMap, error) { "app.kubernetes.io/managed-by": "opentelemetry-operator", "app.kubernetes.io/component": "opentelemetry-collector", } - taConfig["config"] = promConfig + // We only take the "config" from the returned object, we don't need the "target_allocator" configuration here. + taConfig["config"] = promConfig["config"] if len(params.Instance.Spec.TargetAllocator.AllocationStrategy) > 0 { taConfig["allocation_strategy"] = params.Instance.Spec.TargetAllocator.AllocationStrategy } else { diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index ca12d9e6e8..4f1c94568a 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + colfeaturegate "go.opentelemetry.io/collector/featuregate" + "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" @@ -28,6 +30,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" + "github.com/open-telemetry/opentelemetry-operator/pkg/featuregate" ta "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/adapters" ) @@ -184,6 +187,55 @@ service: }) + t.Run("should return expected escaped collector config map with target_allocator config block", func(t *testing.T) { + expectedLables["app.kubernetes.io/component"] = "opentelemetry-collector" + expectedLables["app.kubernetes.io/name"] = "test-collector" + expectedLables["app.kubernetes.io/version"] = "latest" + err := colfeaturegate.GlobalRegistry().Set(featuregate.EnableTargetAllocatorRewrite.ID(), true) + assert.NoError(t, err) + + expectedData := map[string]string{ + "collector.yaml": `exporters: + logging: null +processors: null +receivers: + prometheus: + config: + global: + scrape_interval: 1m + scrape_timeout: 10s + evaluation_interval: 1m + target_allocator: + endpoint: http://test-targetallocator:80 + interval: 30s + collector_id: ${POD_NAME} +service: + pipelines: + metrics: + exporters: + - logging + processors: [] + receivers: + - prometheus +`, + } + + param, err := newParams("test/test-img", "../testdata/http_sd_config_servicemonitor_test.yaml") + assert.NoError(t, err) + param.Instance.Spec.TargetAllocator.Enabled = true + actual := desiredConfigMap(context.Background(), param) + + assert.Equal(t, "test-collector", actual.Name) + assert.Equal(t, expectedLables, actual.Labels) + assert.Equal(t, expectedData, actual.Data) + + // Reset the value + expectedLables["app.kubernetes.io/version"] = "0.47.0" + err = colfeaturegate.GlobalRegistry().Set(featuregate.EnableTargetAllocatorRewrite.ID(), false) + assert.NoError(t, err) + + }) + t.Run("should return expected target allocator config map", func(t *testing.T) { expectedLables["app.kubernetes.io/component"] = "opentelemetry-targetallocator" expectedLables["app.kubernetes.io/name"] = "test-targetallocator" @@ -358,7 +410,7 @@ func TestExpectedConfigMap(t *testing.T) { assert.True(t, exists) assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) - parmConfig, err := ta.ConfigToPromConfig(params().Instance.Spec.Config) + promConfig, err := ta.ConfigToPromConfig(params().Instance.Spec.Config) assert.NoError(t, err) taConfig := make(map[interface{}]interface{}) @@ -367,7 +419,7 @@ func TestExpectedConfigMap(t *testing.T) { "app.kubernetes.io/managed-by": "opentelemetry-operator", "app.kubernetes.io/component": "opentelemetry-collector", } - taConfig["config"] = parmConfig + taConfig["config"] = promConfig["config"] taConfig["allocation_strategy"] = "least-weighted" taConfigYAML, _ := yaml.Marshal(taConfig) diff --git a/pkg/featuregate/featuregate.go b/pkg/featuregate/featuregate.go index 4f9f8a9046..ea42c75e5a 100644 --- a/pkg/featuregate/featuregate.go +++ b/pkg/featuregate/featuregate.go @@ -37,6 +37,13 @@ var ( "operator.autoinstrumentation.java", featuregate.StageBeta, featuregate.WithRegisterDescription("controls whether the operator supports Java auto-instrumentation")) + + // EnableTargetAllocatorRewrite is the feature gate that controls whether the collector's configuration should + // automatically be rewritten when the target allocator is enabled. + EnableTargetAllocatorRewrite = featuregate.GlobalRegistry().MustRegister( + "operator.collector.rewritetargetallocator", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("controls whether the operator should configure the collector's targetAllocator configuration")) ) // Flags creates a new FlagSet that represents the available featuregate flags using the supplied featuregate registry. diff --git a/pkg/targetallocator/adapters/config_to_prom_config.go b/pkg/targetallocator/adapters/config_to_prom_config.go index b4ba5f4d59..1029753ceb 100644 --- a/pkg/targetallocator/adapters/config_to_prom_config.go +++ b/pkg/targetallocator/adapters/config_to_prom_config.go @@ -28,7 +28,7 @@ func errorNotAMap(component string) error { return fmt.Errorf("%s property in the configuration doesn't contain valid %s", component, component) } -// ConfigToPromConfig converts the incoming configuration object into a the Prometheus receiver config. +// ConfigToPromConfig converts the incoming configuration object into the Prometheus receiver config. func ConfigToPromConfig(cfg string) (map[interface{}]interface{}, error) { config, err := adapters.ConfigFromString(cfg) if err != nil { @@ -55,15 +55,5 @@ func ConfigToPromConfig(cfg string) (map[interface{}]interface{}, error) { return nil, errorNotAMap("prometheus") } - prometheusConfigProperty, ok := prometheus["config"] - if !ok { - return nil, errorNoComponent("prometheusConfig") - } - - prometheusConfig, ok := prometheusConfigProperty.(map[interface{}]interface{}) - if !ok { - return nil, errorNotAMap("prometheusConfig") - } - - return prometheusConfig, nil + return prometheus, nil } diff --git a/pkg/targetallocator/adapters/config_to_prom_config_test.go b/pkg/targetallocator/adapters/config_to_prom_config_test.go index ba5cc2978a..f39aaecf72 100644 --- a/pkg/targetallocator/adapters/config_to_prom_config_test.go +++ b/pkg/targetallocator/adapters/config_to_prom_config_test.go @@ -41,9 +41,11 @@ func TestExtractPromConfigFromConfig(t *testing.T) { endpoint: 0.0.0.0:15268 ` expectedData := map[interface{}]interface{}{ - "scrape_config": map[interface{}]interface{}{ - "job_name": "otel-collector", - "scrape_interval": "10s", + "config": map[interface{}]interface{}{ + "scrape_config": map[interface{}]interface{}{ + "job_name": "otel-collector", + "scrape_interval": "10s", + }, }, } @@ -55,7 +57,7 @@ func TestExtractPromConfigFromConfig(t *testing.T) { assert.Equal(t, expectedData, promConfig) } -func TestExtractPromConfigFromNullConfig(t *testing.T) { +func TestExtractPromConfigWithTAConfigFromConfig(t *testing.T) { configStr := `receivers: examplereceiver: endpoint: "0.0.0.0:12345" @@ -63,6 +65,42 @@ func TestExtractPromConfigFromNullConfig(t *testing.T) { endpoint: "0.0.0.0:12346" prometheus: config: + scrape_config: + job_name: otel-collector + scrape_interval: 10s + target_allocator: + endpoint: "test:80" + jaeger/custom: + protocols: + thrift_http: + endpoint: 0.0.0.0:15268 +` + expectedData := map[interface{}]interface{}{ + "config": map[interface{}]interface{}{ + "scrape_config": map[interface{}]interface{}{ + "job_name": "otel-collector", + "scrape_interval": "10s", + }, + }, + "target_allocator": map[interface{}]interface{}{ + "endpoint": "test:80", + }, + } + + // test + promConfig, err := ta.ConfigToPromConfig(configStr) + assert.NoError(t, err) + + // verify + assert.Equal(t, expectedData, promConfig) +} + +func TestExtractPromConfigFromNullConfig(t *testing.T) { + configStr := `receivers: + examplereceiver: + endpoint: "0.0.0.0:12345" + examplereceiver/settings: + endpoint: "0.0.0.0:12346" jaeger/custom: protocols: thrift_http: @@ -71,7 +109,7 @@ func TestExtractPromConfigFromNullConfig(t *testing.T) { // test promConfig, err := ta.ConfigToPromConfig(configStr) - assert.Equal(t, err, fmt.Errorf("%s property in the configuration doesn't contain valid %s", "prometheusConfig", "prometheusConfig")) + assert.Equal(t, err, fmt.Errorf("no prometheus available as part of the configuration")) // verify assert.True(t, reflect.ValueOf(promConfig).IsNil())