Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new scrape config endpoint for the prometheus receiver #14464

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: changes to use the new scrape_configs endpoint in the target allocator to dynamically pull scrape configuration.

# One or more tracking issues related to the change
issues:
- 14597

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (cfg *Config) Validate() error {
}

func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error {
if len(promConfig.ScrapeConfigs) == 0 {
return errors.New("no Prometheus scrape_configs")
if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("no Prometheus scrape_configs or target_allocator set")
}

// Reject features that Prometheus supports but that the receiver doesn't support:
Expand Down
158 changes: 103 additions & 55 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@
package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
promHTTP "github.com/prometheus/prometheus/discovery/http"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)
Expand All @@ -43,31 +48,31 @@ const (

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc

settings component.ReceiverCreateSettings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
targetAllocatorIntervalTicker *time.Ticker
}

type linkJSON struct {
Link string `json:"_link"`
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
targetAllocatorStop chan struct{}
configLoaded chan struct{}
loadConfigOnce sync.Once

settings component.ReceiverCreateSettings
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
}

// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver {
pr := &pReceiver{
cfg: cfg,
consumer: next,
settings: set,
cfg: cfg,
consumer: next,
settings: set,
configLoaded: make(chan struct{}),
targetAllocatorStop: make(chan struct{}),
}
return pr
}

// Start is the method that starts Prometheus scraping and it
// Start is the method that starts Prometheus scraping. It
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(_ context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -92,36 +97,58 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {

allocConf := r.cfg.TargetAllocator
if allocConf != nil {
go func() {
// immediately sync jobs and not wait for the first tick
savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval)
for {
<-r.targetAllocatorIntervalTicker.C
hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if err != nil {
r.settings.Logger.Error(err.Error())
err = r.startTargetAllocator(allocConf, baseCfg)
if err != nil {
return err
}
}

r.loadConfigOnce.Do(func() {
close(r.configLoaded)
})

return nil
}

func (r *pReceiver) startTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) error {
r.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
if err != nil {
return err
}
go func() {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval)
for {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if newErr != nil {
r.settings.Logger.Error(newErr.Error())
continue
}
savedHash = hash
case <-r.targetAllocatorStop:
targetAllocatorIntervalTicker.Stop()
r.settings.Logger.Info("Stopping target allocator")
return
}
}()
}

}
}()
return nil
}

// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) {
r.settings.Logger.Debug("Syncing target allocator jobs")
jobObject, err := getJobResponse(allocConf.Endpoint)
scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint)
if err != nil {
r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
return 0, err
}

hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil)
hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil)
if err != nil {
r.settings.Logger.Error("Failed to hash job list", zap.Error(err))
return 0, err
Expand All @@ -131,29 +158,29 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

cfg := *baseCfg
// Clear out the current configurations
baseCfg.ScrapeConfigs = []*config.ScrapeConfig{}

for _, linkJSON := range *jobObject {
for jobName, scrapeConfig := range scrapeConfigsResponse {
var httpSD promHTTP.SDConfig
if allocConf.HTTPSDConfig == nil {
httpSD = promHTTP.SDConfig{}
httpSD = promHTTP.SDConfig{
RefreshInterval: model.Duration(30 * time.Second),
}
} else {
httpSD = *allocConf.HTTPSDConfig
}

httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID)

scrapeCfg := &config.ScrapeConfig{
JobName: linkJSON.Link,
ServiceDiscoveryConfigs: discovery.Configs{
&httpSD,
},
escapedJob := url.QueryEscape(jobName)
httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID)
httpSD.HTTPClientConfig.FollowRedirects = false
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
&httpSD,
}

cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg)
baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig)
}

err = r.applyCfg(&cfg)
err = r.applyCfg(baseCfg)
if err != nil {
r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
return 0, err
Expand All @@ -162,26 +189,43 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

func getJobResponse(baseURL string) (*map[string]linkJSON, error) {
jobURLString := fmt.Sprintf("%s/jobs", baseURL)
_, err := url.Parse(jobURLString) // check if valid
// instantiateShard inserts the SHARD environment variable in the returned configuration
func (r *pReceiver) instantiateShard(body []byte) []byte {
shard, ok := os.LookupEnv("SHARD")
if !ok {
shard = "0"
}
return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard))
}

func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) {
scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL)
_, err := url.Parse(scrapeConfigsURL) // check if valid
if err != nil {
return nil, err
}

resp, err := http.Get(jobURLString) //nolint
resp, err := http.Get(scrapeConfigsURL) //nolint
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

jobObject := &map[string]linkJSON{}
err = json.NewDecoder(resp.Body).Decode(jobObject)
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
envReplacedBody := r.instantiateShard(body)
err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig)
if err != nil {
return nil, err
}
err = resp.Body.Close()
if err != nil {
return nil, err
}
return jobObject, nil
return jobToScrapeConfig, nil
}

func (r *pReceiver) applyCfg(cfg *config.Config) error {
Expand All @@ -192,6 +236,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error {
discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range cfg.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName))
}
if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil {
return err
Expand All @@ -203,6 +248,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.discoveryManager = discovery.NewManager(ctx, logger)

go func() {
r.settings.Logger.Info("Starting discovery manager")
if err := r.discoveryManager.Run(); err != nil {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand All @@ -228,7 +274,11 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
)
r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store)

go func() {
// The scrape manager needs to wait for the configuration to be loaded before beginning
<-r.configLoaded
r.settings.Logger.Info("Starting scrape manager")
if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand Down Expand Up @@ -257,8 +307,6 @@ func gcInterval(cfg *config.Config) time.Duration {
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
r.scrapeManager.Stop()
if r.targetAllocatorIntervalTicker != nil {
r.targetAllocatorIntervalTicker.Stop()
}
close(r.targetAllocatorStop)
return nil
}
Loading