From 41eceae0dd28bea8abed48222aa962b58888ef00 Mon Sep 17 00:00:00 2001 From: Sebastian Poxhofer Date: Fri, 19 Aug 2022 16:12:44 +0200 Subject: [PATCH] Implement prometheus receiver target allocator (#12586) * feat(prometheusreciever): add target_collector config base line * implementation draft * feat(prometheusreceiver): functioning target allocator job retrieval * docs(prometheusreceiver): add changelog entry * chore(prometheusreceiver): go mod tidy * chore(prometheusreceiver): code style * chore(prometheusreceiver): fixup root go mod * Merge branch 'main' into implement_prometheus_receiver_target_allocator * fix linting * test: create tests * test: add targetallocator tests * chore: update Prometheus to 0.37.0 * fix: main drift * chore(prometheusreceiver): add targetallocator docs and changelog releaser * fix(prometheusreceiver): apply whole Promconfig instead of only discovery * fix(prometheusreceiver): only abort during validation if both scrape_config and target_allocator is missing * chore(prometheusreceiver): fix linting errors * chore: go mod tidy all * tests: fix config test * chore: do not export unnecessarily types * fix: config errors in some scenarios * fix: config resets * chore: go mod tidy * chore: go mod tidy * fix: discovery generation * tests: add test to confirm that jobs are removed and added to the discovery using the target allocator * chore: remove dev comment * chore: add mapstructure to endpoint * chore: remove io/util import * chore: go mod tidy * chore: modimpi Co-authored-by: Alex Boten --- cmd/configschema/go.mod | 1 + cmd/configschema/go.sum | 2 + exporter/prometheusexporter/go.mod | 1 + exporter/prometheusexporter/go.sum | 2 + go.mod | 1 + go.sum | 2 + processor/spanmetricsprocessor/go.sum | 1 + receiver/prometheusexecreceiver/go.mod | 1 + receiver/prometheusexecreceiver/go.sum | 4 +- receiver/prometheusreceiver/README.md | 13 + receiver/prometheusreceiver/config.go | 78 ++- receiver/prometheusreceiver/config_test.go | 43 ++ receiver/prometheusreceiver/go.mod | 2 + receiver/prometheusreceiver/go.sum | 5 +- .../prometheusreceiver/metrics_receiver.go | 152 +++++- .../metrics_receiver_target_allocator_test.go | 496 ++++++++++++++++++ .../metrics_receiver_test.go | 14 +- .../prometheusreceiver/testdata/config.yaml | 11 + .../testdata/config_target_allocator.yaml | 33 ++ receiver/simpleprometheusreceiver/go.mod | 2 +- receiver/simpleprometheusreceiver/go.sum | 3 +- testbed/go.mod | 1 + testbed/go.sum | 2 + unreleased/prometheus-receiver-TA.yaml | 16 + 24 files changed, 863 insertions(+), 23 deletions(-) create mode 100644 receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go create mode 100644 receiver/prometheusreceiver/testdata/config_target_allocator.yaml create mode 100644 unreleased/prometheus-receiver-TA.yaml diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index b78954105731..46ddba2e7a81 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -258,6 +258,7 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/hashstructure v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/sys/mountinfo v0.5.0 // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index e10d6dc86623..2e606233667b 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -1319,6 +1319,8 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0= github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= diff --git a/exporter/prometheusexporter/go.mod b/exporter/prometheusexporter/go.mod index 847e28394311..d9317011ea20 100644 --- a/exporter/prometheusexporter/go.mod +++ b/exporter/prometheusexporter/go.mod @@ -105,6 +105,7 @@ require ( github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/exporter/prometheusexporter/go.sum b/exporter/prometheusexporter/go.sum index ff682e562f9e..0812220bba54 100644 --- a/exporter/prometheusexporter/go.sum +++ b/exporter/prometheusexporter/go.sum @@ -521,6 +521,8 @@ github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go. github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= diff --git a/go.mod b/go.mod index 170939d86eac..30ebf25ea936 100644 --- a/go.mod +++ b/go.mod @@ -403,6 +403,7 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/hashstructure v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/sys/mountinfo v0.5.0 // indirect diff --git a/go.sum b/go.sum index 7a296fa38bb1..160a4cfb1837 100644 --- a/go.sum +++ b/go.sum @@ -1356,6 +1356,8 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0= github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= diff --git a/processor/spanmetricsprocessor/go.sum b/processor/spanmetricsprocessor/go.sum index 48d2d4faa6eb..544a637eb6d4 100644 --- a/processor/spanmetricsprocessor/go.sum +++ b/processor/spanmetricsprocessor/go.sum @@ -288,6 +288,7 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= diff --git a/receiver/prometheusexecreceiver/go.mod b/receiver/prometheusexecreceiver/go.mod index 9b05c50a6bc4..c4ca5a5f0392 100644 --- a/receiver/prometheusexecreceiver/go.mod +++ b/receiver/prometheusexecreceiver/go.mod @@ -95,6 +95,7 @@ require ( github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/receiver/prometheusexecreceiver/go.sum b/receiver/prometheusexecreceiver/go.sum index 3bc4b53bbb49..9aa822c7868e 100644 --- a/receiver/prometheusexecreceiver/go.sum +++ b/receiver/prometheusexecreceiver/go.sum @@ -517,6 +517,8 @@ github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go. github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -564,8 +566,8 @@ github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zM github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/receiver/prometheusreceiver/README.md b/receiver/prometheusreceiver/README.md index 6a8064aab05e..988642d5474c 100644 --- a/receiver/prometheusreceiver/README.md +++ b/receiver/prometheusreceiver/README.md @@ -82,6 +82,19 @@ receivers: action: keep ``` +## OpenTelemetry Operator +Additional to this static job definitions this receiver allows to query a list of jobs from the +OpenTelemetryOperators TargetAllocator or a compatible endpoint. + +```yaml +receivers: + prometheus: + target_allocator: + endpoint: http://my-targetallocator-service + interval: 30s + collector_id: collector-1 +``` + [sc]: https://github.com/prometheus/prometheus/blob/v2.28.1/docs/configuration/configuration.md#scrape_config [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index 438ee5d4ab8e..16fffbc4082d 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "net/url" "os" "path/filepath" "sort" @@ -28,6 +29,7 @@ import ( commonconfig "github.com/prometheus/common/config" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/file" + promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/discovery/targetgroup" "go.opentelemetry.io/collector/config" @@ -38,6 +40,10 @@ import ( const ( // The key for Prometheus scraping configs. prometheusConfigKey = "config" + + // keys to access the http_sd_config from config root + targetAllocatorConfigKey = "target_allocator" + targetAllocatorHTTPSDConfigKey = "http_sd_config" ) // Config defines configuration for Prometheus receiver. @@ -55,12 +61,25 @@ type Config struct { UseStartTimeMetric bool `mapstructure:"use_start_time_metric"` StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"` + TargetAllocator *targetAllocator `mapstructure:"target_allocator"` + // ConfigPlaceholder is just an entry to make the configuration pass a check // that requires that all keys present in the config actually exist on the // structure, ie.: it will error if an unknown key is present. ConfigPlaceholder interface{} `mapstructure:"config"` } +type targetAllocator struct { + Endpoint string `mapstructure:"endpoint"` + Interval time.Duration `mapstructure:"interval"` + CollectorID string `mapstructure:"collector_id"` + // ConfigPlaceholder is just an entry to make the configuration pass a check + // that requires that all keys present in the config actually exist on the + // structure, ie.: it will error if an unknown key is present. + ConfigPlaceholder interface{} `mapstructure:"http_sd_config"` + HTTPSDConfig *promHTTP.SDConfig `mapstructure:"-"` +} + var _ config.Receiver = (*Config)(nil) var _ config.Unmarshallable = (*Config)(nil) @@ -129,9 +148,23 @@ func checkSDFile(filename string) error { // Validate checks the receiver configuration is valid. func (cfg *Config) Validate() error { promConfig := cfg.PrometheusConfig - if promConfig == nil { - return nil // noop receiver + if promConfig != nil { + err := cfg.validatePromConfig(promConfig) + if err != nil { + return err + } } + + if cfg.TargetAllocator != nil { + err := cfg.validateTargetAllocatorConfig() + if err != nil { + return err + } + } + return nil +} + +func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error { if len(promConfig.ScrapeConfigs) == 0 { return errors.New("no Prometheus scrape_configs") } @@ -209,6 +242,24 @@ func (cfg *Config) Validate() error { return nil } +func (cfg *Config) validateTargetAllocatorConfig() error { + // validate targetAllocator + targetAllocatorConfig := cfg.TargetAllocator + if targetAllocatorConfig == nil { + return nil + } + // ensure valid endpoint + if _, err := url.ParseRequestURI(targetAllocatorConfig.Endpoint); err != nil { + return fmt.Errorf("TargetAllocator endpoint is not valid: %s", targetAllocatorConfig.Endpoint) + } + // ensure valid collectorID without variables + if targetAllocatorConfig.CollectorID == "" || strings.Contains(targetAllocatorConfig.CollectorID, "${") { + return fmt.Errorf("CollectorID is not a valid ID") + } + + return nil +} + // Unmarshal a config.Parser into the config struct. func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { if componentParser == nil { @@ -237,5 +288,28 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { return fmt.Errorf("prometheus receiver failed to unmarshal yaml to prometheus config: %w", err) } + // Unmarshal targetAllocator configs + targetAllocatorCfg, err := componentParser.Sub(targetAllocatorConfigKey) + if err != nil { + return err + } + targetAllocatorHTTPSDCfg, err := targetAllocatorCfg.Sub(targetAllocatorHTTPSDConfigKey) + if err != nil { + return err + } + + targetAllocatorHTTPSDMap := targetAllocatorHTTPSDCfg.ToStringMap() + if len(targetAllocatorHTTPSDMap) != 0 { + targetAllocatorHTTPSDMap["url"] = "http://placeholder" // we have to set it as else the marshal will fail + httpSDConf, err := yaml.Marshal(targetAllocatorHTTPSDMap) + if err != nil { + return fmt.Errorf("prometheus receiver failed to marshal config to yaml: %w", err) + } + err = yaml.UnmarshalStrict(httpSDConf, &cfg.TargetAllocator.HTTPSDConfig) + if err != nil { + return fmt.Errorf("prometheus receiver failed to unmarshal yaml to prometheus config: %w", err) + } + } + return nil } diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 629239174a68..8ee1aff5abc2 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + promConfig "github.com/prometheus/common/config" + promModel "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -48,6 +50,47 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval), 5*time.Second) assert.Equal(t, r1.UseStartTimeMetric, true) assert.Equal(t, r1.StartTimeMetricRegex, "^(.+_)*process_start_time_seconds$") + + assert.Equal(t, "http://my-targetallocator-service", r1.TargetAllocator.Endpoint) + assert.Equal(t, 30*time.Second, r1.TargetAllocator.Interval) + assert.Equal(t, "collector-1", r1.TargetAllocator.CollectorID) + assert.Equal(t, promModel.Duration(60*time.Second), r1.TargetAllocator.HTTPSDConfig.RefreshInterval) + assert.Equal(t, "prometheus", r1.TargetAllocator.HTTPSDConfig.HTTPClientConfig.BasicAuth.Username) + assert.Equal(t, promConfig.Secret("changeme"), r1.TargetAllocator.HTTPSDConfig.HTTPClientConfig.BasicAuth.Password) +} + +func TestLoadTargetAllocatorConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Receivers[typeStr] = factory + cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config_target_allocator.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + assert.Equal(t, len(cfg.Receivers), 3) + + r0 := cfg.Receivers[config.NewComponentID(typeStr)].(*Config) + assert.Nil(t, r0.PrometheusConfig) + assert.Equal(t, "http://localhost:8080", r0.TargetAllocator.Endpoint) + assert.Equal(t, 30*time.Second, r0.TargetAllocator.Interval) + assert.Equal(t, "collector-1", r0.TargetAllocator.CollectorID) + + r1 := cfg.Receivers[config.NewComponentIDWithName(typeStr, "withScrape")].(*Config) + assert.Nil(t, r0.PrometheusConfig) + assert.Equal(t, "http://localhost:8080", r0.TargetAllocator.Endpoint) + assert.Equal(t, 30*time.Second, r0.TargetAllocator.Interval) + assert.Equal(t, "collector-1", r0.TargetAllocator.CollectorID) + + assert.Equal(t, 1, len(r1.PrometheusConfig.ScrapeConfigs)) + assert.Equal(t, "demo", r1.PrometheusConfig.ScrapeConfigs[0].JobName) + assert.Equal(t, promModel.Duration(5*time.Second), r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval) + + r2 := cfg.Receivers[config.NewComponentIDWithName(typeStr, "withOnlyScrape")].(*Config) + assert.Equal(t, 1, len(r2.PrometheusConfig.ScrapeConfigs)) + assert.Equal(t, "demo", r2.PrometheusConfig.ScrapeConfigs[0].JobName) + assert.Equal(t, promModel.Duration(5*time.Second), r2.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval) } func TestLoadConfigFailsOnUnknownSection(t *testing.T) { diff --git a/receiver/prometheusreceiver/go.mod b/receiver/prometheusreceiver/go.mod index 2a77f5acbe8b..f54743c8ce33 100644 --- a/receiver/prometheusreceiver/go.mod +++ b/receiver/prometheusreceiver/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-kit/log v0.2.1 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.58.0 github.com/prometheus/common v0.37.0 github.com/prometheus/prometheus v0.37.0 @@ -121,6 +122,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.58.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect + github.com/pelletier/go-toml v1.9.4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/receiver/prometheusreceiver/go.sum b/receiver/prometheusreceiver/go.sum index 12c0335e0afc..0914a46fab1e 100644 --- a/receiver/prometheusreceiver/go.sum +++ b/receiver/prometheusreceiver/go.sum @@ -529,6 +529,8 @@ github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go. github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -576,8 +578,9 @@ github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zM github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= +github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index d5e4404bd540..2a21f5956168 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -16,10 +16,17 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" + "encoding/json" + "fmt" + "net/http" + "net/url" "time" + "github.com/go-kit/log" + "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/scrape" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -39,8 +46,14 @@ type pReceiver struct { consumer consumer.Metrics cancelFunc context.CancelFunc - settings component.ReceiverCreateSettings - scrapeManager *scrape.Manager + settings component.ReceiverCreateSettings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + targetAllocatorIntervalTicker *time.Ticker +} + +type linkJSON struct { + Link string `json:"_link"` } // New creates a new prometheus.Receiver reference. @@ -61,16 +74,135 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { logger := internal.NewZapToGokitLogAdapter(r.settings.Logger) - discoveryManager := discovery.NewManager(discoveryCtx, logger) + // add scrape configs defined by the collector configs + baseCfg := r.cfg.PrometheusConfig + + err := r.initPrometheusComponents(discoveryCtx, host, logger) + if err != nil { + r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err)) + return err + } + + err = r.applyCfg(baseCfg) + if err != nil { + r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return err + } + + allocConf := r.cfg.TargetAllocator + if allocConf != nil { + go func() { + // immediately sync jobs and not wait for the first tick + savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) + r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) + for { + <-r.targetAllocatorIntervalTicker.C + hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg) + if err != nil { + r.settings.Logger.Error(err.Error()) + continue + } + savedHash = hash + } + }() + } + + return nil +} + +// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. +// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. +func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { + r.settings.Logger.Debug("Syncing target allocator jobs") + jobObject, err := getJobResponse(allocConf.Endpoint) + if err != nil { + r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) + return 0, err + } + + hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil) + if err != nil { + r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) + return 0, err + } + if hash == compareHash { + // no update needed + return hash, nil + } + + cfg := *baseCfg + + for _, linkJSON := range *jobObject { + var httpSD promHTTP.SDConfig + if allocConf.HTTPSDConfig == nil { + httpSD = promHTTP.SDConfig{} + } else { + httpSD = *allocConf.HTTPSDConfig + } + + httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID) + + scrapeCfg := &config.ScrapeConfig{ + JobName: linkJSON.Link, + ServiceDiscoveryConfigs: discovery.Configs{ + &httpSD, + }, + } + + cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg) + } + + err = r.applyCfg(&cfg) + if err != nil { + r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return 0, err + } + + return hash, nil +} + +func getJobResponse(baseURL string) (*map[string]linkJSON, error) { + jobURLString := fmt.Sprintf("%s/jobs", baseURL) + _, err := url.Parse(jobURLString) // check if valid + if err != nil { + return nil, err + } + + resp, err := http.Get(jobURLString) //nolint + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + jobObject := &map[string]linkJSON{} + err = json.NewDecoder(resp.Body).Decode(jobObject) + if err != nil { + return nil, err + } + return jobObject, nil +} + +func (r *pReceiver) applyCfg(cfg *config.Config) error { + if err := r.scrapeManager.ApplyConfig(cfg); err != nil { + return err + } + discoveryCfg := make(map[string]discovery.Configs) - for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs { + for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs } - if err := discoveryManager.ApplyConfig(discoveryCfg); err != nil { + if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err } + return nil +} + +func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component.Host, logger log.Logger) error { + r.discoveryManager = discovery.NewManager(ctx, logger) + go func() { - if err := discoveryManager.Run(); err != nil { + if err := r.discoveryManager.Run(); err != nil { r.settings.Logger.Error("Discovery manager failed", zap.Error(err)) host.ReportFatalError(err) } @@ -86,11 +218,8 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, ) r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) - if err := r.scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil { - return err - } go func() { - if err := r.scrapeManager.Run(discoveryManager.SyncCh()); err != nil { + if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) host.ReportFatalError(err) } @@ -118,5 +247,8 @@ func gcInterval(cfg *config.Config) time.Duration { func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() r.scrapeManager.Stop() + if r.targetAllocatorIntervalTicker != nil { + r.targetAllocatorIntervalTicker.Stop() + } return nil } diff --git a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go new file mode 100644 index 000000000000..ef1387889c48 --- /dev/null +++ b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go @@ -0,0 +1,496 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race + +package prometheusreceiver + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + commonconfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + promConfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + promHTTP "github.com/prometheus/prometheus/discovery/http" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.uber.org/atomic" +) + +type MockTargetAllocator struct { + mu sync.Mutex // mu protects the fields below. + endpoints map[string][]mockTargetAllocatorResponse + accessIndex map[string]*atomic.Int32 + wg *sync.WaitGroup + srv *httptest.Server + waitIndex map[string]int +} + +type mockTargetAllocatorResponse struct { + code int + data []byte +} + +type mockTargetAllocatorResponseRaw struct { + code int + data interface{} +} + +type hTTPSDResponse struct { + Targets []string `json:"targets"` + Labels map[model.LabelName]model.LabelValue `json:"labels"` +} + +type expectedTestResultJobMap struct { + Targets []string + Labels model.LabelSet +} + +type expectedTestResult struct { + empty bool + jobMap map[string]expectedTestResultJobMap +} + +func (mta *MockTargetAllocator) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + mta.mu.Lock() + defer mta.mu.Unlock() + + iptr, ok := mta.accessIndex[req.URL.Path] + if !ok { + rw.WriteHeader(404) + return + } + index := int(iptr.Load()) + iptr.Add(1) + pages := mta.endpoints[req.URL.Path] + if index >= len(pages) { + rw.WriteHeader(404) + return + } + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(pages[index].code) + _, _ = rw.Write(pages[index].data) + + // release WaitGroup after all endpoints have been hit by Prometheus SD once. After that we will call them manually + wait := mta.waitIndex[req.URL.Path] + if index == wait { + mta.wg.Done() + } +} + +func (mta *MockTargetAllocator) Start() { + mta.srv.Start() +} + +func (mta *MockTargetAllocator) Stop() { + mta.srv.Close() +} + +func transformTAResponseMap(rawResponses map[string][]mockTargetAllocatorResponseRaw) (map[string][]mockTargetAllocatorResponse, map[string]*atomic.Int32, error) { + responsesMap := make(map[string][]mockTargetAllocatorResponse) + responsesIndexMap := make(map[string]*atomic.Int32) + for path, responsesRaw := range rawResponses { + var responses []mockTargetAllocatorResponse + for _, responseRaw := range responsesRaw { + respBodyBytes, err := json.Marshal(responseRaw.data) + if err != nil { + return nil, nil, err + } + responses = append(responses, mockTargetAllocatorResponse{ + code: responseRaw.code, + data: respBodyBytes, + }) + } + responsesMap[path] = responses + + v := atomic.NewInt32(0) + responsesIndexMap[path] = v + } + return responsesMap, responsesIndexMap, nil +} + +func setupMockTargetAllocator(responses Responses) (*MockTargetAllocator, error) { + responsesMap, responsesIndexMap, err := transformTAResponseMap(responses.responses) + if err != nil { + return nil, err + } + + mockTA := &MockTargetAllocator{ + endpoints: responsesMap, + accessIndex: responsesIndexMap, + waitIndex: responses.releaserMap, + wg: &sync.WaitGroup{}, + } + mockTA.srv = httptest.NewUnstartedServer(mockTA) + mockTA.wg.Add(len(responsesMap)) + + return mockTA, nil +} + +func labelSetTargetsToList(sets []model.LabelSet) []string { + var result []string + for _, set := range sets { + address := set["__address__"] + result = append(result, string(address)) + } + return result +} + +type Responses struct { + releaserMap map[string]int + responses map[string][]mockTargetAllocatorResponseRaw +} + +func TestTargetAllocatorJobRetrieval(t *testing.T) { + for _, tc := range []struct { + desc string + responses Responses + cfg *Config + want expectedTestResult + }{ + { + desc: "default", + responses: Responses{ + responses: map[string][]mockTargetAllocatorResponseRaw{ + "/jobs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ + "job1": {Link: "/jobs/job1/targets"}, + "job2": {Link: "/jobs/job2/targets"}, + }}, + }, + "/jobs/job1/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + }, + "/jobs/job2/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }}, + }, + }, + }, + cfg: &Config{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)), + PrometheusConfig: &promConfig.Config{}, + TargetAllocator: &targetAllocator{ + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &promHTTP.SDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{ + BasicAuth: &commonconfig.BasicAuth{ + Username: "user", + Password: "aPassword", + }, + }, + RefreshInterval: model.Duration(60 * time.Second), + }, + }, + }, + want: expectedTestResult{ + empty: false, + jobMap: map[string]expectedTestResultJobMap{ + "job1": { + Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }, + }, + "job2": {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }, + }, + }, + { + desc: "update labels and targets", + responses: Responses{ + responses: map[string][]mockTargetAllocatorResponseRaw{ + "/jobs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ + "job1": {Link: "/jobs/job1/targets"}, + "job2": {Link: "/jobs/job2/targets"}, + }}, + }, + "/jobs/job1/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + "test": "aTest", + }}, + }}, + }, + "/jobs/job2/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + }}, + }}, + }, + }, + }, + cfg: &Config{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)), + PrometheusConfig: &promConfig.Config{}, + TargetAllocator: &targetAllocator{ + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &promHTTP.SDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), + }, + }, + }, + want: expectedTestResult{ + empty: false, + jobMap: map[string]expectedTestResultJobMap{ + "job1": { + Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + "test": "aTest", + }, + }, + "job2": {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + }}, + }, + }, + }, + { + desc: "update job list", + responses: Responses{ + releaserMap: map[string]int{ + "/jobs": 1, + }, + responses: map[string][]mockTargetAllocatorResponseRaw{ + "/jobs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ + "job1": {Link: "/jobs/job1/targets"}, + "job2": {Link: "/jobs/job2/targets"}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ + "job1": {Link: "/jobs/job1/targets"}, + "job3": {Link: "/jobs/job3/targets"}, + }}, + }, + "/jobs/job1/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + }, + "/jobs/job3/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }}, + }, + }, + }, + cfg: &Config{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)), + PrometheusConfig: &promConfig.Config{}, + TargetAllocator: &targetAllocator{ + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &promHTTP.SDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), + }, + }, + }, + want: expectedTestResult{ + empty: false, + jobMap: map[string]expectedTestResultJobMap{ + "job1": { + Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }, + }, + "job3": {Targets: []string{"10.0.40.3:9100"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "alertmanager", + }}, + }, + }, + }, + { + desc: "endpoint is not reachable", + responses: Responses{ + releaserMap: map[string]int{ + "/jobs": 1, // we are too fast if we ignore the first wait a tick + }, + responses: map[string][]mockTargetAllocatorResponseRaw{ + "/jobs": { + mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, + mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, + }, + }, + }, + cfg: &Config{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)), + PrometheusConfig: &promConfig.Config{}, + TargetAllocator: &targetAllocator{ + Interval: 50 * time.Millisecond, + CollectorID: "collector-1", + HTTPSDConfig: &promHTTP.SDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), + }, + }, + }, + want: expectedTestResult{ + empty: true, + jobMap: map[string]expectedTestResultJobMap{}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := context.Background() + cms := new(consumertest.MetricsSink) + + allocator, err := setupMockTargetAllocator(tc.responses) + require.NoError(t, err, "Failed to create allocator", tc.responses) + + allocator.Start() + defer allocator.Stop() + + tc.cfg.TargetAllocator.Endpoint = allocator.srv.URL // set service URL with the automatic generated one + receiver := newPrometheusReceiver(componenttest.NewNopReceiverCreateSettings(), tc.cfg, cms) + + require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost())) + + allocator.wg.Wait() + + providers := receiver.discoveryManager.Providers() + if tc.want.empty { + // if no base config is supplied and the job retrieval fails and therefor no configuration is available + // PrometheusSD adds a static provider as default + require.Len(t, providers, 1) + require.IsType(t, discovery.StaticConfig{}, providers[0].Config()) + return + } + + require.NotNil(t, providers) + + for _, provider := range providers { + require.IsType(t, &promHTTP.Discovery{}, provider.Discoverer()) + httpDiscovery := provider.Discoverer().(*promHTTP.Discovery) + refresh, err := httpDiscovery.Refresh(ctx) + require.NoError(t, err) + + // are http configs applied? + sdConfig := provider.Config().(*promHTTP.SDConfig) + require.Equal(t, tc.cfg.TargetAllocator.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig) + + for _, group := range refresh { + found := false + for job, s := range tc.want.jobMap { + // find correct job to compare to. + if !strings.Contains(group.Source, job) { + continue + } + // compare targets + require.Equal(t, s.Targets, labelSetTargetsToList(group.Targets)) + + // compare labels and add __meta_url as this label gets automatically added by the SD. + // which is identical to the source url + s.Labels["__meta_url"] = model.LabelValue(sdConfig.URL) + require.Equal(t, s.Labels, group.Labels) + found = true + } + require.True(t, found, "Returned job is not defined in expected values", group) + } + } + }) + } +} diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 47a051340b90..e42e0029832b 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -19,7 +19,7 @@ import ( "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" + promConfig "github.com/prometheus/prometheus/config" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pmetric" "google.golang.org/protobuf/types/known/timestamppb" @@ -1445,18 +1445,18 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []*pmetric func TestGCInterval(t *testing.T) { for _, tc := range []struct { desc string - input *config.Config + input *promConfig.Config want time.Duration }{ { desc: "default", - input: &config.Config{}, + input: &promConfig.Config{}, want: defaultGCInterval, }, { desc: "global override", - input: &config.Config{ - GlobalConfig: config.GlobalConfig{ + input: &promConfig.Config{ + GlobalConfig: promConfig.GlobalConfig{ ScrapeInterval: model.Duration(10 * time.Minute), }, }, @@ -1464,8 +1464,8 @@ func TestGCInterval(t *testing.T) { }, { desc: "scrape config override", - input: &config.Config{ - ScrapeConfigs: []*config.ScrapeConfig{ + input: &promConfig.Config{ + ScrapeConfigs: []*promConfig.ScrapeConfig{ { ScrapeInterval: model.Duration(10 * time.Minute), }, diff --git a/receiver/prometheusreceiver/testdata/config.yaml b/receiver/prometheusreceiver/testdata/config.yaml index 5c115d9e268d..bd76462e8d6c 100644 --- a/receiver/prometheusreceiver/testdata/config.yaml +++ b/receiver/prometheusreceiver/testdata/config.yaml @@ -5,6 +5,17 @@ receivers: buffer_count: 45 use_start_time_metric: true start_time_metric_regex: '^(.+_)*process_start_time_seconds$' + target_allocator: + endpoint: http://my-targetallocator-service + interval: 30s + collector_id: collector-1 + # imported struct from the Prometheus code base. Can be used optionally to configure the jobs as seen in the docs + # https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config + http_sd_config: + refresh_interval: 60s + basic_auth: + username: prometheus + password: changeme config: scrape_configs: - job_name: 'demo' diff --git a/receiver/prometheusreceiver/testdata/config_target_allocator.yaml b/receiver/prometheusreceiver/testdata/config_target_allocator.yaml new file mode 100644 index 000000000000..0dd61ec21383 --- /dev/null +++ b/receiver/prometheusreceiver/testdata/config_target_allocator.yaml @@ -0,0 +1,33 @@ +receivers: + prometheus: + target_allocator: + endpoint: http://localhost:8080 + interval: 30s + collector_id: collector-1 + prometheus/withScrape: + target_allocator: + endpoint: http://localhost:8080 + interval: 30s + collector_id: collector-1 + config: + scrape_configs: + - job_name: 'demo' + scrape_interval: 5s + prometheus/withOnlyScrape: + config: + scrape_configs: + - job_name: 'demo' + scrape_interval: 5s + +processors: + nop: + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [prometheus] + processors: [nop] + exporters: [nop] diff --git a/receiver/simpleprometheusreceiver/go.mod b/receiver/simpleprometheusreceiver/go.mod index 9b55d6871724..5bbd746305d1 100644 --- a/receiver/simpleprometheusreceiver/go.mod +++ b/receiver/simpleprometheusreceiver/go.mod @@ -96,6 +96,7 @@ require ( github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -104,7 +105,6 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect - github.com/pelletier/go-toml v1.9.4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.13.0 // indirect diff --git a/receiver/simpleprometheusreceiver/go.sum b/receiver/simpleprometheusreceiver/go.sum index b6ff97997598..3077335d8707 100644 --- a/receiver/simpleprometheusreceiver/go.sum +++ b/receiver/simpleprometheusreceiver/go.sum @@ -520,6 +520,8 @@ github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go. github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -569,7 +571,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= -github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/testbed/go.mod b/testbed/go.mod index 88af763e8675..785b19fb2c5c 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -139,6 +139,7 @@ require ( github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/testbed/go.sum b/testbed/go.sum index c4ede3d678e6..ef243cce8902 100644 --- a/testbed/go.sum +++ b/testbed/go.sum @@ -601,6 +601,8 @@ github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go. github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= diff --git a/unreleased/prometheus-receiver-TA.yaml b/unreleased/prometheus-receiver-TA.yaml new file mode 100644 index 000000000000..7a3ffe6ffdd1 --- /dev/null +++ b/unreleased/prometheus-receiver-TA.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: allow to query scrape jobs from OpenTelemetryOperators TargetAllocator + +# One or more tracking issues related to the change +issues: [7944] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: