Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scrape configs endpoint #1124

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions cmd/otel-allocator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,42 @@ This configuration will be resolved to target configurations and then split acro
TargetAllocators expose the results as [HTTP_SD endpoints](https://prometheus.io/docs/prometheus/latest/http_sd/)
split by collector.

Currently, the Target Allocator handles the sharding of targets. The operator sets the `$SHARD` variable to 0 to allow
collectors to keep targets generated by a Prometheus CRD. Using Prometheus sharding and target allocator sharding is not
recommended currently and may lead to unknown results.
[See this thread for more information](https://github.com/open-telemetry/opentelemetry-operator/pull/1124#discussion_r984683577)

#### Endpoints
`/scrape_configs`:

```json
{
"job1": {
"follow_redirects": true,
"honor_timestamps": true,
"job_name": "job1",
"metric_relabel_configs": [],
"metrics_path": "/metrics",
"scheme": "http",
"scrape_interval": "1m",
"scrape_timeout": "10s",
"static_configs": []
},
"job2": {
"follow_redirects": true,
"honor_timestamps": true,
"job_name": "job2",
"metric_relabel_configs": [],
"metrics_path": "/metrics",
"relabel_configs": [],
"scheme": "http",
"scrape_interval": "1m",
"scrape_timeout": "10s",
"kubernetes_sd_configs": []
}
}
```

`/jobs`:

```json
Expand Down
10 changes: 10 additions & 0 deletions cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
}
}

func (m *Manager) GetScrapeConfigs() map[string]*config.ScrapeConfig {
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
for _, c := range m.configsMap {
for _, scrapeConfig := range c.ScrapeConfigs {
jobToScrapeConfig[scrapeConfig.JobName] = scrapeConfig
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return jobToScrapeConfig
}

func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error {
m.configsMap[source] = cfg

Expand Down
8 changes: 4 additions & 4 deletions cmd/otel-allocator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ module github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator
go 1.19

require (
github.com/buraksezer/consistent v0.9.0
github.com/cespare/xxhash/v2 v2.1.2
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
github.com/go-kit/log v0.2.0
github.com/go-logr/logr v1.2.0
github.com/gorilla/mux v1.8.0
github.com/prometheus-operator/prometheus-operator v0.53.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.53.1
github.com/prometheus-operator/prometheus-operator/pkg/client v0.53.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v1.8.2-0.20211214150951-52c693a63be1
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -42,8 +46,6 @@ require (
github.com/aws/aws-sdk-go v1.44.41 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/buraksezer/consistent v0.9.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe // indirect
github.com/containerd/containerd v1.5.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -59,7 +61,6 @@ require (
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/zapr v1.2.0 // indirect
github.com/go-openapi/analysis v0.20.0 // indirect
Expand Down Expand Up @@ -123,7 +124,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus-community/prom-label-proxy v0.4.1-0.20211215142838-1eac0933d512 // indirect
github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
41 changes: 33 additions & 8 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"syscall"
"time"

"github.com/ghodss/yaml"
gokitlog "github.com/go-kit/log"
"github.com/go-logr/logr"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
yaml2 "gopkg.in/yaml.v2"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"

Expand Down Expand Up @@ -166,6 +168,7 @@ func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager
}
router := mux.NewRouter().UseEncodedPath()
router.Use(s.PrometheusMiddleware)
router.HandleFunc("/scrape_configs", s.ScrapeConfigsHandler).Methods("GET")
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
router.HandleFunc("/jobs", s.JobHandler).Methods("GET")
router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET")
router.Path("/metrics").Handler(promhttp.Handler())
Expand Down Expand Up @@ -204,12 +207,33 @@ func (s *server) Shutdown(ctx context.Context) error {
return s.server.Shutdown(ctx)
}

// ScrapeConfigsHandler returns the available scrape configuration discovered by the target allocator.
// The target allocator first marshals these configurations such that the underlying prometheus marshaling is used.
// After that, the YAML is converted in to a JSON format for consumers to use.
func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) {
configs := s.discoveryManager.GetScrapeConfigs()
configBytes, err := yaml2.Marshal(configs)
if err != nil {
s.errorHandler(w, err)
}
jsonConfig, err := yaml.YAMLToJSON(configBytes)
if err != nil {
s.errorHandler(w, err)
}
// We don't use the jsonHandler method because we don't want our bytes to be re-encoded
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonConfig)
if err != nil {
s.errorHandler(w, err)
}
}

func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) {
displayData := make(map[string]allocation.LinkJSON)
for _, v := range s.allocator.TargetItems() {
displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link}
}
jsonHandler(s.logger, w, displayData)
s.jsonHandler(w, displayData)
}

// PrometheusMiddleware implements mux.MiddlewareFunc.
Expand All @@ -233,33 +257,34 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
jobId, err := url.QueryUnescape(params["job_id"])
if err != nil {
errorHandler(w)
s.errorHandler(w, err)
return
}

if len(q) == 0 {
displayData := allocation.GetAllTargetsByJob(jobId, compareMap, s.allocator)
jsonHandler(s.logger, w, displayData)
s.jsonHandler(w, displayData)

} else {
tgs := allocation.GetAllTargetsByCollectorAndJob(q[0], jobId, compareMap, s.allocator)
// Displays empty list if nothing matches
if len(tgs) == 0 {
jsonHandler(s.logger, w, []interface{}{})
s.jsonHandler(w, []interface{}{})
return
}
jsonHandler(s.logger, w, tgs)
s.jsonHandler(w, tgs)
}
}

func errorHandler(w http.ResponseWriter) {
func (s *server) errorHandler(w http.ResponseWriter, err error) {
w.WriteHeader(500)
s.jsonHandler(w, err)
}

func jsonHandler(logger logr.Logger, w http.ResponseWriter, data interface{}) {
func (s *server) jsonHandler(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(data)
if err != nil {
logger.Error(err, "failed to encode data for http response")
s.logger.Error(err, "failed to encode data for http response")
}
}
5 changes: 5 additions & 0 deletions cmd/otel-allocator/watcher/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type FileWatcher struct {
configFilePath string
watcher *fsnotify.Watcher
closer chan bool
}

func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatcher, error) {
Expand All @@ -38,6 +39,7 @@ func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatch
return FileWatcher{
configFilePath: *config.ConfigFilePath,
watcher: fileWatcher,
closer: make(chan bool),
}, nil
}

Expand All @@ -51,6 +53,8 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error
go func() {
for {
select {
case <-f.closer:
return
case fileEvent := <-f.watcher.Events:
if fileEvent.Op == fsnotify.Create {
upstreamEvents <- Event{
Expand All @@ -66,5 +70,6 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error
}

func (f *FileWatcher) Close() error {
f.closer <- true
return f.watcher.Close()
}
12 changes: 12 additions & 0 deletions pkg/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
},
})

if otelcol.Spec.TargetAllocator.Enabled {
// We need to add a SHARD here so the collector is able to keep targets after the hashmod operation which is
// added by default by the Prometheus operator's config generator.
// All collector instances use SHARD == 0 as they only receive targets
// allocated to them and should not use the Prometheus hashmod-based
// allocation.
envVars = append(envVars, corev1.EnvVar{
Name: "SHARD",
Value: "0",
})
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}

var livenessProbe *corev1.Probe
if config, err := adapters.ConfigFromString(otelcol.Spec.Config); err == nil {
if probe, err := adapters.ConfigToContainerProbe(config); err == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/reconcile/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func expectedServices(ctx context.Context, params Params, expected []corev1.Serv
updated.ObjectMeta.Labels[k] = v
}
updated.Spec.Ports = desired.Spec.Ports
updated.Spec.Selector = desired.Spec.Selector
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved

patch := client.MergeFrom(existing)

Expand Down
16 changes: 16 additions & 0 deletions pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,23 @@ func TestExpectedServices(t *testing.T) {
assert.True(t, exists)
assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID)
assert.Contains(t, actual.Spec.Ports, extraPorts)
})
t.Run("should update service on version change", func(t *testing.T) {
serviceInstance := service("test-collector", params().Instance.Spec.Ports)
createObjectIfNotExists(t, "test-collector", &serviceInstance)

newService := service("test-collector", params().Instance.Spec.Ports)
newService.Spec.Selector["app.kubernetes.io/version"] = "Newest"
err := expectedServices(context.Background(), params(), []v1.Service{newService})
assert.NoError(t, err)

actual := v1.Service{}
exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"})

assert.NoError(t, err)
assert.True(t, exists)
assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID)
assert.Equal(t, "Newest", actual.Spec.Selector["app.kubernetes.io/version"])
})
}

Expand Down