Skip to content

Commit

Permalink
[target-allocator] Populate store assets (authorization information) …
Browse files Browse the repository at this point in the history
…for Prometheus CR watcher (open-telemetry#1710)

* Adjust Watcher interface

Add context parameter to to LoadConfig method in perparation to use K8s client in the method.

Signed-off-by: Matej Gera <[email protected]>

* Enhance the Prometheus CR watcher struct

Add K8s client and logger, in preparation for obtaining service / pod monitor sotore assets.

Signed-off-by: Matej Gera <[email protected]>

* Adds methods and logic to obtain store assets from service / pod monitors

Signed-off-by: Matej Gera <[email protected]>

* Add unit tests for Promtehus CR watcher

Signed-off-by: Matej Gera <[email protected]>

* Add changelog

Signed-off-by: Matej Gera <[email protected]>

* Add disclaimer about secrets in the readme

Signed-off-by: Matej Gera <[email protected]>

* Fix store param and tests after branch update

Signed-off-by: Matej Gera <[email protected]>

* Fix botched merge

Signed-off-by: Matej Gera <[email protected]>

---------

Signed-off-by: Matej Gera <[email protected]>
Signed-off-by: Matej Gera <[email protected]>
Co-authored-by: Jacob Aronoff <[email protected]>
Co-authored-by: Tyler Helmuth <[email protected]>
Co-authored-by: Pavol Loffay <[email protected]>
  • Loading branch information
4 people authored Jun 28, 2023
1 parent 8a6bacd commit 08cbc99
Show file tree
Hide file tree
Showing 7 changed files with 500 additions and 28 deletions.
16 changes: 16 additions & 0 deletions .chloggen/1710-prometheus-cr-scrape-config-credentials.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Populate credentials for Prometheus CR (service and pod monitor) scrape configs.

# One or more tracking issues related to the change
issues: [1669]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
6 changes: 6 additions & 0 deletions cmd/otel-allocator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ the `targetAllocator:` part of the OpenTelemetryCollector CR.
**Note**: The Collector part of this same CR *also* has a serviceAccount key which only affects the collector and *not*
the TargetAllocator.

### Service / Pod monitor endpoint credentials

If your service or pod monitor endpoints require credentials or other supported form of authentication (bearer token, basic auth, OAuth2 etc.), you need to ensure that the collector has access to this information. Due to some limitations in how the endpoints configuration is handled, target allocator currently does **not** support credentials provided via secrets. It is only possible to provide credentials in a file (for more details see issue https://github.com/open-telemetry/opentelemetry-operator/issues/1669).

In order to ensure your endpoints can be scraped, your collector instance needs to have the particular secret mounted as a file at the correct path.


# Design

Expand Down
4 changes: 2 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
defer close(interrupts)

if *cliConf.PromCRWatcherConf.Enabled {
promWatcher, err = allocatorWatcher.NewPrometheusCRWatcher(cfg, cliConf)
promWatcher, err = allocatorWatcher.NewPrometheusCRWatcher(setupLog.WithName("prometheus-cr-watcher"), cfg, cliConf)
if err != nil {
setupLog.Error(err, "Can't start the prometheus watcher")
os.Exit(1)
Expand Down Expand Up @@ -193,7 +193,7 @@ func main() {
select {
case event := <-eventChan:
eventsMetric.WithLabelValues(event.Source.String()).Inc()
loadConfig, err := event.Watcher.LoadConfig()
loadConfig, err := event.Watcher.LoadConfig(ctx)
if err != nil {
setupLog.Error(err, "Unable to load configuration")
continue
Expand Down
3 changes: 2 additions & 1 deletion cmd/otel-allocator/watcher/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package watcher

import (
"context"
"path/filepath"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -48,7 +49,7 @@ func NewFileWatcher(logger logr.Logger, config config.CLIConfig) (*FileWatcher,
}, nil
}

func (f *FileWatcher) LoadConfig() (*promconfig.Config, error) {
func (f *FileWatcher) LoadConfig(_ context.Context) (*promconfig.Config, error) {
cfg, err := config.Load(f.configFilePath)
if err != nil {
f.logger.Error(err, "Unable to load configuration")
Expand Down
146 changes: 123 additions & 23 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package watcher

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
promv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
"github.com/prometheus-operator/prometheus-operator/pkg/assets"
Expand All @@ -29,34 +31,30 @@ import (
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
)

func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(cliConfig.ClusterConfig)
if err != nil {
return nil, err
}

factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces

serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
clientset, err := kubernetes.NewForConfig(cliConfig.ClusterConfig)
if err != nil {
return nil, err
}

podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces

monitoringInformers, err := getInformers(factory)
if err != nil {
return nil, err
}

monitoringInformers := map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}

// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
Spec: monitoringv1.PrometheusSpec{
Expand All @@ -76,7 +74,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi
podMonSelector := getSelector(cfg.PodMonitorSelector)

return &PrometheusCRWatcher{
logger: logger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
configGenerator: generator,
Expand All @@ -87,7 +87,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi
}

type PrometheusCRWatcher struct {
kubeMonitoringClient *monitoringclient.Clientset
logger logr.Logger
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
Expand All @@ -98,13 +100,30 @@ type PrometheusCRWatcher struct {
}

func getSelector(s map[string]string) labels.Selector {
sel := labels.NewSelector()
if s == nil {
return sel
return labels.NewSelector()
}
return labels.SelectorFromSet(s)
}

// getInformers returns a map of informers for the given resources.
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) {
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
if err != nil {
return nil, err
}

podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
if err != nil {
return nil, err
}

return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}, nil
}

// Watch wrapped informers and wait for an initial sync.
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
event := Event{
Expand Down Expand Up @@ -143,12 +162,13 @@ func (w *PrometheusCRWatcher) Close() error {
return nil
}

func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) {
store := assets.NewStore(w.k8sClient.CoreV1(), w.k8sClient.CoreV1())
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)

smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
monitor := sm.(*monitoringv1.ServiceMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForServiceMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.Endpoints, store)
serviceMonitorInstances[key] = monitor
})
if smRetrieveErr != nil {
Expand All @@ -159,19 +179,13 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) {
monitor := pm.(*monitoringv1.PodMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForPodMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.PodMetricsEndpoints, store)
podMonitorInstances[key] = monitor
})
if pmRetrieveErr != nil {
return nil, pmRetrieveErr
}

store := assets.Store{
TLSAssets: nil,
TokenAssets: nil,
BasicAuthAssets: nil,
OAuth2Assets: nil,
SigV4Assets: nil,
}
generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
"30s",
"",
Expand All @@ -184,7 +198,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
podMonitorInstances,
map[string]*monitoringv1.Probe{},
map[string]*promv1alpha1.ScrapeConfig{},
&store,
store,
nil,
nil,
nil,
Expand All @@ -211,3 +225,89 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
}
return promCfg, nil
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L73.
func (w *PrometheusCRWatcher) addStoreAssetsForServiceMonitor(
ctx context.Context,
smName, smNamespace string,
endps []monitoringv1.Endpoint,
store *assets.Store,
) {
var err error
for i, endp := range endps {
objKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", smNamespace, smName, i)

if err = store.AddBearerToken(ctx, smNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, smNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddTLSConfig(ctx, smNamespace, endp.TLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, smNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("serviceMonitor/auth/%s/%s/%d", smNamespace, smName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a ServiceMonitor", "serviceMonitor", smName)
}
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and pod metrics endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L314.
func (w *PrometheusCRWatcher) addStoreAssetsForPodMonitor(
ctx context.Context,
pmName, pmNamespace string,
podMetricsEndps []monitoringv1.PodMetricsEndpoint,
store *assets.Store,
) {
var err error
for i, endp := range podMetricsEndps {
objKey := fmt.Sprintf("podMonitor/%s/%s/%d", pmNamespace, pmName, i)

if err = store.AddBearerToken(ctx, pmNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, pmNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddSafeTLSConfig(ctx, pmNamespace, &endp.TLSConfig.SafeTLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, pmNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("podMonitor/auth/%s/%s/%d", pmNamespace, pmName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a PodMonitor", "podMonitor", pmName)
}
}
Loading

0 comments on commit 08cbc99

Please sign in to comment.