diff --git a/.chloggen/prometheus_receiver_target_allocator_scrape_config.yaml b/.chloggen/prometheus_receiver_target_allocator_scrape_config.yaml new file mode 100644 index 000000000000..9c91670b2e43 --- /dev/null +++ b/.chloggen/prometheus_receiver_target_allocator_scrape_config.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: changes to use the new scrape_configs endpoint in the target allocator to dynamically pull scrape configuration. + +# One or more tracking issues related to the change +issues: + - 14597 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index d479af48f1f0..c99a8369c65d 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -165,8 +165,8 @@ func (cfg *Config) Validate() error { } func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error { - if len(promConfig.ScrapeConfigs) == 0 { - return errors.New("no Prometheus scrape_configs") + if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil { + return errors.New("no Prometheus scrape_configs or target_allocator set") } // Reject features that Prometheus supports but that the receiver doesn't support: diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 098abdb77019..c3ce9aa557d6 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -15,16 +15,20 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" import ( + "bytes" "context" - "encoding/json" "fmt" + "io" "net/http" "net/url" + "os" "regexp" + "sync" "time" "github.com/go-kit/log" "github.com/mitchellh/hashstructure/v2" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" @@ -32,6 +36,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" + "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" ) @@ -43,31 +48,31 @@ const ( // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc - - settings component.ReceiverCreateSettings - scrapeManager *scrape.Manager - discoveryManager *discovery.Manager - targetAllocatorIntervalTicker *time.Ticker -} - -type linkJSON struct { - Link string `json:"_link"` + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + targetAllocatorStop chan struct{} + configLoaded chan struct{} + loadConfigOnce sync.Once + + settings component.ReceiverCreateSettings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver { pr := &pReceiver{ - cfg: cfg, - consumer: next, - settings: set, + cfg: cfg, + consumer: next, + settings: set, + configLoaded: make(chan struct{}), + targetAllocatorStop: make(chan struct{}), } return pr } -// Start is the method that starts Prometheus scraping and it +// Start is the method that starts Prometheus scraping. It // is controlled by having previously defined a Configuration using perhaps New. func (r *pReceiver) Start(_ context.Context, host component.Host) error { discoveryCtx, cancel := context.WithCancel(context.Background()) @@ -92,22 +97,44 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { allocConf := r.cfg.TargetAllocator if allocConf != nil { - go func() { - // immediately sync jobs and not wait for the first tick - savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) - r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval) - for { - <-r.targetAllocatorIntervalTicker.C - hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if err != nil { - r.settings.Logger.Error(err.Error()) + err = r.startTargetAllocator(allocConf, baseCfg) + if err != nil { + return err + } + } + + r.loadConfigOnce.Do(func() { + close(r.configLoaded) + }) + + return nil +} + +func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error { + r.settings.Logger.Info("Starting target allocator discovery") + // immediately sync jobs, not waiting for the first tick + savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) + if err != nil { + return err + } + go func() { + targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval) + for { + select { + case <-targetAllocatorIntervalTicker.C: + hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) + if newErr != nil { + r.settings.Logger.Error(newErr.Error()) continue } savedHash = hash + case <-r.targetAllocatorStop: + targetAllocatorIntervalTicker.Stop() + r.settings.Logger.Info("Stopping target allocator") + return } - }() - } - + } + }() return nil } @@ -115,13 +142,13 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { // baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) { r.settings.Logger.Debug("Syncing target allocator jobs") - jobObject, err := getJobResponse(allocConf.Endpoint) + scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint) if err != nil { r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) return 0, err } - hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil) + hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil) if err != nil { r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) return 0, err @@ -131,29 +158,29 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } - cfg := *baseCfg + // Clear out the current configurations + baseCfg.ScrapeConfigs = []*config.ScrapeConfig{} - for _, linkJSON := range *jobObject { + for jobName, scrapeConfig := range scrapeConfigsResponse { var httpSD promHTTP.SDConfig if allocConf.HTTPSDConfig == nil { - httpSD = promHTTP.SDConfig{} + httpSD = promHTTP.SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), + } } else { httpSD = *allocConf.HTTPSDConfig } - - httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID) - - scrapeCfg := &config.ScrapeConfig{ - JobName: linkJSON.Link, - ServiceDiscoveryConfigs: discovery.Configs{ - &httpSD, - }, + escapedJob := url.QueryEscape(jobName) + httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) + httpSD.HTTPClientConfig.FollowRedirects = false + scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ + &httpSD, } - cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg) + baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig) } - err = r.applyCfg(&cfg) + err = r.applyCfg(baseCfg) if err != nil { r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return 0, err @@ -162,26 +189,43 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll return hash, nil } -func getJobResponse(baseURL string) (*map[string]linkJSON, error) { - jobURLString := fmt.Sprintf("%s/jobs", baseURL) - _, err := url.Parse(jobURLString) // check if valid +// instantiateShard inserts the SHARD environment variable in the returned configuration +func (r *pReceiver) instantiateShard(body []byte) []byte { + shard, ok := os.LookupEnv("SHARD") + if !ok { + shard = "0" + } + return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) +} + +func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { + scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) + _, err := url.Parse(scrapeConfigsURL) // check if valid if err != nil { return nil, err } - resp, err := http.Get(jobURLString) //nolint + resp, err := http.Get(scrapeConfigsURL) //nolint if err != nil { return nil, err } - defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } - jobObject := &map[string]linkJSON{} - err = json.NewDecoder(resp.Body).Decode(jobObject) + jobToScrapeConfig := map[string]*config.ScrapeConfig{} + envReplacedBody := r.instantiateShard(body) + err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) + if err != nil { + return nil, err + } + err = resp.Body.Close() if err != nil { return nil, err } - return jobObject, nil + return jobToScrapeConfig, nil } func (r *pReceiver) applyCfg(cfg *config.Config) error { @@ -192,6 +236,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error { discoveryCfg := make(map[string]discovery.Configs) for _, scrapeConfig := range cfg.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) } if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil { return err @@ -203,6 +248,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.discoveryManager = discovery.NewManager(ctx, logger) go func() { + r.settings.Logger.Info("Starting discovery manager") if err := r.discoveryManager.Run(); err != nil { r.settings.Logger.Error("Discovery manager failed", zap.Error(err)) host.ReportFatalError(err) @@ -228,7 +274,11 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, ) r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store) + go func() { + // The scrape manager needs to wait for the configuration to be loaded before beginning + <-r.configLoaded + r.settings.Logger.Info("Starting scrape manager") if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil { r.settings.Logger.Error("Scrape manager failed", zap.Error(err)) host.ReportFatalError(err) @@ -257,8 +307,6 @@ func gcInterval(cfg *config.Config) time.Duration { func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() r.scrapeManager.Stop() - if r.targetAllocatorIntervalTicker != nil { - r.targetAllocatorIntervalTicker.Stop() - } + close(r.targetAllocatorStop) return nil } diff --git a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go index d320f0e15549..e4cf7ace13c6 100644 --- a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go @@ -29,7 +29,6 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" promConfig "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -173,10 +172,26 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "default", responses: Responses{ responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -252,10 +267,26 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "update labels and targets", responses: Responses{ responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -326,17 +357,49 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "update job list", responses: Responses{ releaserMap: map[string]int{ - "/jobs": 1, + "/scrape_configs": 1, }, responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job2": {Link: "/jobs/job2/targets"}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job2": { + "job_name": "job2", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, - mockTargetAllocatorResponseRaw{code: 200, data: map[string]linkJSON{ - "job1": {Link: "/jobs/job1/targets"}, - "job3": {Link: "/jobs/job3/targets"}, + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]interface{}{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, + "job3": { + "job_name": "job3", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "metrics_path": "/metrics", + "scheme": "http", + "relabel_configs": nil, + "metric_relabel_configs": nil, + }, }}, }, "/jobs/job1/targets": { @@ -407,12 +470,12 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { desc: "endpoint is not reachable", responses: Responses{ releaserMap: map[string]int{ - "/jobs": 1, // we are too fast if we ignore the first wait a tick + "/scrape_configs": 1, // we are too fast if we ignore the first wait a tick }, responses: map[string][]mockTargetAllocatorResponseRaw{ - "/jobs": { - mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, - mockTargetAllocatorResponseRaw{code: 404, data: map[string]linkJSON{}}, + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]interface{}{}}, + mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]interface{}{}}, }, }, }, @@ -453,10 +516,8 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { providers := receiver.discoveryManager.Providers() if tc.want.empty { - // if no base config is supplied and the job retrieval fails and therefor no configuration is available - // PrometheusSD adds a static provider as default - require.Len(t, providers, 1) - require.IsType(t, discovery.StaticConfig{}, providers[0].Config()) + // if no base config is supplied and the job retrieval fails then no configuration should be found + require.Len(t, providers, 0) return }