Skip to content

Commit

Permalink
Use the new scrape config endpoint for the prometheus receiver (open-…
Browse files Browse the repository at this point in the history
…telemetry#14464)

 This fixes issue 1106 in the operator which relates to the collector failing to get scrape config jobs from the target allocator. Before this is merged this pr needs to be merged which adds this new endpoint.
  • Loading branch information
jaronoff97 authored and shalper2 committed Dec 6, 2022
1 parent 9d2adb7 commit 5bb8f1d
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 82 deletions.
17 changes: 17 additions & 0 deletions .chloggen/prometheus_receiver_target_allocator_scrape_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: changes to use the new scrape_configs endpoint in the target allocator to dynamically pull scrape configuration.

# One or more tracking issues related to the change
issues:
- 14597

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func (cfg *Config) Validate() error {
}

func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error {
if len(promConfig.ScrapeConfigs) == 0 {
return errors.New("no Prometheus scrape_configs")
if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil {
return errors.New("no Prometheus scrape_configs or target_allocator set")
}

// Reject features that Prometheus supports but that the receiver doesn't support:
Expand Down
158 changes: 103 additions & 55 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@
package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
promHTTP "github.com/prometheus/prometheus/discovery/http"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)
Expand All @@ -43,31 +48,31 @@ const (

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc

settings component.ReceiverCreateSettings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
targetAllocatorIntervalTicker *time.Ticker
}

type linkJSON struct {
Link string `json:"_link"`
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
targetAllocatorStop chan struct{}
configLoaded chan struct{}
loadConfigOnce sync.Once

settings component.ReceiverCreateSettings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
}

// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver {
pr := &pReceiver{
cfg: cfg,
consumer: next,
settings: set,
cfg: cfg,
consumer: next,
settings: set,
configLoaded: make(chan struct{}),
targetAllocatorStop: make(chan struct{}),
}
return pr
}

// Start is the method that starts Prometheus scraping and it
// Start is the method that starts Prometheus scraping. It
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(_ context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -92,36 +97,58 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {

allocConf := r.cfg.TargetAllocator
if allocConf != nil {
go func() {
// immediately sync jobs and not wait for the first tick
savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval)
for {
<-r.targetAllocatorIntervalTicker.C
hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if err != nil {
r.settings.Logger.Error(err.Error())
err = r.startTargetAllocator(allocConf, baseCfg)
if err != nil {
return err
}
}

r.loadConfigOnce.Do(func() {
close(r.configLoaded)
})

return nil
}

func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error {
r.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
if err != nil {
return err
}
go func() {
targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval)
for {
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if newErr != nil {
r.settings.Logger.Error(newErr.Error())
continue
}
savedHash = hash
case <-r.targetAllocatorStop:
targetAllocatorIntervalTicker.Stop()
r.settings.Logger.Info("Stopping target allocator")
return
}
}()
}

}
}()
return nil
}

// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) {
r.settings.Logger.Debug("Syncing target allocator jobs")
jobObject, err := getJobResponse(allocConf.Endpoint)
scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint)
if err != nil {
r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
return 0, err
}

hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil)
hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil)
if err != nil {
r.settings.Logger.Error("Failed to hash job list", zap.Error(err))
return 0, err
Expand All @@ -131,29 +158,29 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

cfg := *baseCfg
// Clear out the current configurations
baseCfg.ScrapeConfigs = []*config.ScrapeConfig{}

for _, linkJSON := range *jobObject {
for jobName, scrapeConfig := range scrapeConfigsResponse {
var httpSD promHTTP.SDConfig
if allocConf.HTTPSDConfig == nil {
httpSD = promHTTP.SDConfig{}
httpSD = promHTTP.SDConfig{
RefreshInterval: model.Duration(30 * time.Second),
}
} else {
httpSD = *allocConf.HTTPSDConfig
}

httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID)

scrapeCfg := &config.ScrapeConfig{
JobName: linkJSON.Link,
ServiceDiscoveryConfigs: discovery.Configs{
&httpSD,
},
escapedJob := url.QueryEscape(jobName)
httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID)
httpSD.HTTPClientConfig.FollowRedirects = false
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
&httpSD,
}

cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg)
baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig)
}

err = r.applyCfg(&cfg)
err = r.applyCfg(baseCfg)
if err != nil {
r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
return 0, err
Expand All @@ -162,26 +189,43 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

func getJobResponse(baseURL string) (*map[string]linkJSON, error) {
jobURLString := fmt.Sprintf("%s/jobs", baseURL)
_, err := url.Parse(jobURLString) // check if valid
// instantiateShard inserts the SHARD environment variable in the returned configuration
func (r *pReceiver) instantiateShard(body []byte) []byte {
shard, ok := os.LookupEnv("SHARD")
if !ok {
shard = "0"
}
return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard))
}

func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) {
scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL)
_, err := url.Parse(scrapeConfigsURL) // check if valid
if err != nil {
return nil, err
}

resp, err := http.Get(jobURLString) //nolint
resp, err := http.Get(scrapeConfigsURL) //nolint
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

jobObject := &map[string]linkJSON{}
err = json.NewDecoder(resp.Body).Decode(jobObject)
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
envReplacedBody := r.instantiateShard(body)
err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig)
if err != nil {
return nil, err
}
err = resp.Body.Close()
if err != nil {
return nil, err
}
return jobObject, nil
return jobToScrapeConfig, nil
}

func (r *pReceiver) applyCfg(cfg *config.Config) error {
Expand All @@ -192,6 +236,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error {
discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range cfg.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName))
}
if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil {
return err
Expand All @@ -203,6 +248,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.discoveryManager = discovery.NewManager(ctx, logger)

go func() {
r.settings.Logger.Info("Starting discovery manager")
if err := r.discoveryManager.Run(); err != nil {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand All @@ -228,7 +274,11 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
)
r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store)

go func() {
// The scrape manager needs to wait for the configuration to be loaded before beginning
<-r.configLoaded
r.settings.Logger.Info("Starting scrape manager")
if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand Down Expand Up @@ -257,8 +307,6 @@ func gcInterval(cfg *config.Config) time.Duration {
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
r.scrapeManager.Stop()
if r.targetAllocatorIntervalTicker != nil {
r.targetAllocatorIntervalTicker.Stop()
}
close(r.targetAllocatorStop)
return nil
}
Loading

0 comments on commit 5bb8f1d

Please sign in to comment.