diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index e41120b723..f2b8397930 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -539,4 +539,4 @@ func (in *ScaleSubresourceStatus) DeepCopy() *ScaleSubresourceStatus { out := new(ScaleSubresourceStatus) in.DeepCopyInto(out) return out -} \ No newline at end of file +} diff --git a/cmd/otel-allocator/README.md b/cmd/otel-allocator/README.md index bff32f383b..f4b4d53fda 100644 --- a/cmd/otel-allocator/README.md +++ b/cmd/otel-allocator/README.md @@ -12,7 +12,42 @@ This configuration will be resolved to target configurations and then split acro TargetAllocators expose the results as [HTTP_SD endpoints](https://prometheus.io/docs/prometheus/latest/http_sd/) split by collector. +Currently, the Target Allocator handles the sharding of targets. The operator sets the `$SHARD` variable to 0 to allow +collectors to keep targets generated by a Prometheus CRD. Using Prometheus sharding and target allocator sharding is not +recommended currently and may lead to unknown results. +[See this thread for more information](https://github.com/open-telemetry/opentelemetry-operator/pull/1124#discussion_r984683577) + #### Endpoints +`/scrape_configs`: + +```json +{ + "job1": { + "follow_redirects": true, + "honor_timestamps": true, + "job_name": "job1", + "metric_relabel_configs": [], + "metrics_path": "/metrics", + "scheme": "http", + "scrape_interval": "1m", + "scrape_timeout": "10s", + "static_configs": [] + }, + "job2": { + "follow_redirects": true, + "honor_timestamps": true, + "job_name": "job2", + "metric_relabel_configs": [], + "metrics_path": "/metrics", + "relabel_configs": [], + "scheme": "http", + "scrape_interval": "1m", + "scrape_timeout": "10s", + "kubernetes_sd_configs": [] + } +} +``` + `/jobs`: ```json diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index b6e45f9915..9cdadde887 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -61,6 +61,16 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options } } +func (m *Manager) GetScrapeConfigs() map[string]*config.ScrapeConfig { + jobToScrapeConfig := map[string]*config.ScrapeConfig{} + for _, c := range m.configsMap { + for _, scrapeConfig := range c.ScrapeConfigs { + jobToScrapeConfig[scrapeConfig.JobName] = scrapeConfig + } + } + return jobToScrapeConfig +} + func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error { m.configsMap[source] = cfg diff --git a/cmd/otel-allocator/go.mod b/cmd/otel-allocator/go.mod index 93fafa28ef..83d2ee1b68 100644 --- a/cmd/otel-allocator/go.mod +++ b/cmd/otel-allocator/go.mod @@ -3,13 +3,17 @@ module github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator go 1.19 require ( + github.com/buraksezer/consistent v0.9.0 + github.com/cespare/xxhash/v2 v2.1.2 github.com/fsnotify/fsnotify v1.5.1 + github.com/ghodss/yaml v1.0.0 github.com/go-kit/log v0.2.0 github.com/go-logr/logr v1.2.0 github.com/gorilla/mux v1.8.0 github.com/prometheus-operator/prometheus-operator v0.53.1 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.53.1 github.com/prometheus-operator/prometheus-operator/pkg/client v0.53.1 + github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.32.1 github.com/prometheus/prometheus v1.8.2-0.20211214150951-52c693a63be1 github.com/spf13/pflag v1.0.5 @@ -42,8 +46,6 @@ require ( github.com/aws/aws-sdk-go v1.44.41 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/buraksezer/consistent v0.9.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe // indirect github.com/containerd/containerd v1.5.7 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -59,7 +61,6 @@ require ( github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fatih/color v1.12.0 // indirect - github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/zapr v1.2.0 // indirect github.com/go-openapi/analysis v0.20.0 // indirect @@ -123,7 +124,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus-community/prom-label-proxy v0.4.1-0.20211215142838-1eac0933d512 // indirect github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a // indirect - github.com/prometheus/client_golang v1.11.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 7c830322ea..85e1ca4c45 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -24,12 +24,14 @@ import ( "syscall" "time" + "github.com/ghodss/yaml" gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + yaml2 "gopkg.in/yaml.v2" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" @@ -166,6 +168,7 @@ func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager } router := mux.NewRouter().UseEncodedPath() router.Use(s.PrometheusMiddleware) + router.HandleFunc("/scrape_configs", s.ScrapeConfigsHandler).Methods("GET") router.HandleFunc("/jobs", s.JobHandler).Methods("GET") router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET") router.Path("/metrics").Handler(promhttp.Handler()) @@ -204,12 +207,33 @@ func (s *server) Shutdown(ctx context.Context) error { return s.server.Shutdown(ctx) } +// ScrapeConfigsHandler returns the available scrape configuration discovered by the target allocator. +// The target allocator first marshals these configurations such that the underlying prometheus marshaling is used. +// After that, the YAML is converted in to a JSON format for consumers to use. +func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) { + configs := s.discoveryManager.GetScrapeConfigs() + configBytes, err := yaml2.Marshal(configs) + if err != nil { + s.errorHandler(w, err) + } + jsonConfig, err := yaml.YAMLToJSON(configBytes) + if err != nil { + s.errorHandler(w, err) + } + // We don't use the jsonHandler method because we don't want our bytes to be re-encoded + w.Header().Set("Content-Type", "application/json") + _, err = w.Write(jsonConfig) + if err != nil { + s.errorHandler(w, err) + } +} + func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { displayData := make(map[string]allocation.LinkJSON) for _, v := range s.allocator.TargetItems() { displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link} } - jsonHandler(s.logger, w, displayData) + s.jsonHandler(w, displayData) } // PrometheusMiddleware implements mux.MiddlewareFunc. @@ -233,33 +257,34 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) jobId, err := url.QueryUnescape(params["job_id"]) if err != nil { - errorHandler(w) + s.errorHandler(w, err) return } if len(q) == 0 { displayData := allocation.GetAllTargetsByJob(jobId, compareMap, s.allocator) - jsonHandler(s.logger, w, displayData) + s.jsonHandler(w, displayData) } else { tgs := allocation.GetAllTargetsByCollectorAndJob(q[0], jobId, compareMap, s.allocator) // Displays empty list if nothing matches if len(tgs) == 0 { - jsonHandler(s.logger, w, []interface{}{}) + s.jsonHandler(w, []interface{}{}) return } - jsonHandler(s.logger, w, tgs) + s.jsonHandler(w, tgs) } } -func errorHandler(w http.ResponseWriter) { +func (s *server) errorHandler(w http.ResponseWriter, err error) { w.WriteHeader(500) + s.jsonHandler(w, err) } -func jsonHandler(logger logr.Logger, w http.ResponseWriter, data interface{}) { +func (s *server) jsonHandler(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(data) if err != nil { - logger.Error(err, "failed to encode data for http response") + s.logger.Error(err, "failed to encode data for http response") } } diff --git a/cmd/otel-allocator/watcher/file.go b/cmd/otel-allocator/watcher/file.go index a2a7f9fd92..069b9ac483 100644 --- a/cmd/otel-allocator/watcher/file.go +++ b/cmd/otel-allocator/watcher/file.go @@ -26,6 +26,7 @@ import ( type FileWatcher struct { configFilePath string watcher *fsnotify.Watcher + closer chan bool } func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatcher, error) { @@ -38,6 +39,7 @@ func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatch return FileWatcher{ configFilePath: *config.ConfigFilePath, watcher: fileWatcher, + closer: make(chan bool), }, nil } @@ -51,6 +53,8 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error go func() { for { select { + case <-f.closer: + return case fileEvent := <-f.watcher.Events: if fileEvent.Op == fsnotify.Create { upstreamEvents <- Event{ @@ -66,5 +70,6 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error } func (f *FileWatcher) Close() error { + f.closer <- true return f.watcher.Close() } diff --git a/pkg/collector/container.go b/pkg/collector/container.go index 5c0548feb4..e61a6b6384 100644 --- a/pkg/collector/container.go +++ b/pkg/collector/container.go @@ -78,6 +78,18 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem }, }) + if otelcol.Spec.TargetAllocator.Enabled { + // We need to add a SHARD here so the collector is able to keep targets after the hashmod operation which is + // added by default by the Prometheus operator's config generator. + // All collector instances use SHARD == 0 as they only receive targets + // allocated to them and should not use the Prometheus hashmod-based + // allocation. + envVars = append(envVars, corev1.EnvVar{ + Name: "SHARD", + Value: "0", + }) + } + var livenessProbe *corev1.Probe if config, err := adapters.ConfigFromString(otelcol.Spec.Config); err == nil { if probe, err := adapters.ConfigToContainerProbe(config); err == nil { diff --git a/pkg/collector/reconcile/service.go b/pkg/collector/reconcile/service.go index 1fbb6337c6..896c99a356 100644 --- a/pkg/collector/reconcile/service.go +++ b/pkg/collector/reconcile/service.go @@ -235,6 +235,7 @@ func expectedServices(ctx context.Context, params Params, expected []corev1.Serv updated.ObjectMeta.Labels[k] = v } updated.Spec.Ports = desired.Spec.Ports + updated.Spec.Selector = desired.Spec.Selector patch := client.MergeFrom(existing) diff --git a/pkg/collector/reconcile/service_test.go b/pkg/collector/reconcile/service_test.go index 9334c7d53e..f200a166bc 100644 --- a/pkg/collector/reconcile/service_test.go +++ b/pkg/collector/reconcile/service_test.go @@ -165,7 +165,23 @@ func TestExpectedServices(t *testing.T) { assert.True(t, exists) assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) assert.Contains(t, actual.Spec.Ports, extraPorts) + }) + t.Run("should update service on version change", func(t *testing.T) { + serviceInstance := service("test-collector", params().Instance.Spec.Ports) + createObjectIfNotExists(t, "test-collector", &serviceInstance) + newService := service("test-collector", params().Instance.Spec.Ports) + newService.Spec.Selector["app.kubernetes.io/version"] = "Newest" + err := expectedServices(context.Background(), params(), []v1.Service{newService}) + assert.NoError(t, err) + + actual := v1.Service{} + exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) + assert.Equal(t, "Newest", actual.Spec.Selector["app.kubernetes.io/version"]) }) }