Skip to content

Commit

Permalink
Adds methods and logic to obtain store assets from service / pod moni…
Browse files Browse the repository at this point in the history
…tors

Signed-off-by: Matej Gera <[email protected]>
  • Loading branch information
matej-g committed May 4, 2023
1 parent cb0d5a6 commit bb4bd44
Showing 1 changed file with 104 additions and 14 deletions.
118 changes: 104 additions & 14 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package watcher

import (
"context"
"fmt"

allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"

"github.com/go-kit/log"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus-operator/prometheus-operator/pkg/assets"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
Expand All @@ -30,6 +32,7 @@ import (
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -51,11 +54,6 @@ func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config, cliC
return nil, err
}

monitoringInformers := map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}

generator, err := prometheus.NewConfigGenerator(log.NewNopLogger(), &monitoringv1.Prometheus{}, true) // TODO replace Nop?
if err != nil {
return nil, err
Expand Down Expand Up @@ -155,11 +153,12 @@ func (w *PrometheusCRWatcher) Close() error {
}

func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) {
store := assets.NewStore(w.k8sClient.CoreV1(), w.k8sClient.CoreV1())
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)

smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
monitor := sm.(*monitoringv1.ServiceMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForServiceMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.Endpoints, store)
serviceMonitorInstances[key] = monitor
})
if smRetrieveErr != nil {
Expand All @@ -170,19 +169,13 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) {
monitor := pm.(*monitoringv1.PodMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForPodMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.PodMetricsEndpoints, store)
podMonitorInstances[key] = monitor
})
if pmRetrieveErr != nil {
return nil, pmRetrieveErr
}

store := assets.Store{
TLSAssets: nil,
TokenAssets: nil,
BasicAuthAssets: nil,
OAuth2Assets: nil,
SigV4Assets: nil,
}
// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
Spec: monitoringv1.PrometheusSpec{
Expand All @@ -192,7 +185,18 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
},
},
}
generatedConfig, err := w.configGenerator.Generate(prom, serviceMonitorInstances, podMonitorInstances, map[string]*monitoringv1.Probe{}, &store, nil, nil, nil, []string{})

generatedConfig, err := w.configGenerator.Generate(
prom,
serviceMonitorInstances,
podMonitorInstances,
map[string]*monitoringv1.Probe{},
store,
nil,
nil,
nil,
[]string{},
)
if err != nil {
return nil, err
}
Expand All @@ -215,3 +219,89 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
}
return promCfg, nil
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L73.
func (w *PrometheusCRWatcher) addStoreAssetsForServiceMonitor(
ctx context.Context,
smName, smNamespace string,
endps []monitoringv1.Endpoint,
store *assets.Store,
) {
var err error
for i, endp := range endps {
objKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", smNamespace, smName, i)

if err = store.AddBearerToken(ctx, smNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, smNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddTLSConfig(ctx, smNamespace, endp.TLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, smNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("serviceMonitor/auth/%s/%s/%d", smNamespace, smName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a ServiceMonitor", "serviceMonitor", smName)
}
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and pod metrics endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L314.
func (w *PrometheusCRWatcher) addStoreAssetsForPodMonitor(
ctx context.Context,
pmName, pmNamespace string,
podMetricsEndps []monitoringv1.PodMetricsEndpoint,
store *assets.Store,
) {
var err error
for i, endp := range podMetricsEndps {
objKey := fmt.Sprintf("podMonitor/%s/%s/%d", pmNamespace, pmName, i)

if err = store.AddBearerToken(ctx, pmNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, pmNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddSafeTLSConfig(ctx, pmNamespace, &endp.TLSConfig.SafeTLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, pmNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("podMonitor/auth/%s/%s/%d", pmNamespace, pmName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a PodMonitor", "podMonitor", pmName)
}
}

0 comments on commit bb4bd44

Please sign in to comment.