Skip to content

Commit

Permalink
feat: Trigger immediate reconciliation when self-monitor alert state …
Browse files Browse the repository at this point in the history
…changes (#912)
  • Loading branch information
Stanislav Khalash authored Mar 27, 2024
1 parent befcd10 commit f24cea3
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ linters-settings:
alias: telemetrycontrollers
- pkg: github.com/kyma-project/telemetry-manager/internal/otelcollector/config/metric/agent
alias: configmetricagent
- pkg: github.com/kyma-project/telemetry-manager/internal/selfmonitor/webhook
alias: selfmonitorwebhook
- pkg: github.com/kyma-project/telemetry-manager/internal/resources/common
alias: commonresources
- pkg: github.com/kyma-project/telemetry-manager/test/testkit/k8s
Expand Down
27 changes: 22 additions & 5 deletions controllers/telemetry/metricpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1"
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
Expand All @@ -42,14 +44,15 @@ import (
// MetricPipelineController reconciles a MetricPipeline object
type MetricPipelineController struct {
client.Client

reconciler *metricpipeline.Reconciler
reconcileTriggerChan <-chan event.GenericEvent
reconciler *metricpipeline.Reconciler
}

func NewMetricPipelineController(client client.Client, reconciler *metricpipeline.Reconciler) *MetricPipelineController {
func NewMetricPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, reconciler *metricpipeline.Reconciler) *MetricPipelineController {
return &MetricPipelineController{
Client: client,
reconciler: reconciler,
Client: client,
reconcileTriggerChan: reconcileTriggerChan,
reconciler: reconciler,
}
}

Expand All @@ -61,6 +64,11 @@ func (r *MetricPipelineController) Reconcile(ctx context.Context, req ctrl.Reque
func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).For(&telemetryv1alpha1.MetricPipeline{})

b.WatchesRawSource(
&source.Channel{Source: r.reconcileTriggerChan},
handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent),
)

ownedResourceTypesToWatch := []client.Object{
&appsv1.Deployment{},
&appsv1.DaemonSet{},
Expand Down Expand Up @@ -123,6 +131,15 @@ func (r *MetricPipelineController) mapTelemetryChanges(ctx context.Context, obje
return requests
}

func (r *MetricPipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("Reconcile trigger event received")
requests, err := r.createRequestsForAllPipelines(ctx)
if err != nil {
logf.FromContext(ctx).Error(err, "Unable to create reconcile requests")
}
return requests
}

func (r *MetricPipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) {
var pipelines telemetryv1alpha1.MetricPipelineList
var requests []reconcile.Request
Expand Down
3 changes: 3 additions & 0 deletions controllers/telemetry/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/log"
logzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -149,13 +150,15 @@ var _ = BeforeSuite(func() {

tracePipelineController := NewTracePipelineController(
client,
make(chan event.GenericEvent),
tracepipeline.NewReconciler(client, testTracePipelineReconcilerConfig, &k8sutils.DeploymentProber{Client: client}, false, nil, overridesHandler),
)
err = tracePipelineController.SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())

metricPipelineController := NewMetricPipelineController(
client,
make(chan event.GenericEvent),
metricpipeline.NewReconciler(client, testMetricPipelineReconcilerConfig, &k8sutils.DeploymentProber{Client: client}, &k8sutils.DaemonSetProber{Client: client}, false, nil, overridesHandler))
err = metricPipelineController.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
Expand Down
27 changes: 22 additions & 5 deletions controllers/telemetry/tracepipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1"
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
Expand All @@ -41,14 +43,15 @@ import (
// TracePipelineController reconciles a TracePipeline object
type TracePipelineController struct {
client.Client

reconciler *tracepipeline.Reconciler
reconcileTriggerChan <-chan event.GenericEvent
reconciler *tracepipeline.Reconciler
}

func NewTracePipelineController(client client.Client, reconciler *tracepipeline.Reconciler) *TracePipelineController {
func NewTracePipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, reconciler *tracepipeline.Reconciler) *TracePipelineController {
return &TracePipelineController{
Client: client,
reconciler: reconciler,
Client: client,
reconcileTriggerChan: reconcileTriggerChan,
reconciler: reconciler,
}
}

Expand All @@ -59,6 +62,11 @@ func (r *TracePipelineController) Reconcile(ctx context.Context, req ctrl.Reques
func (r *TracePipelineController) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).For(&telemetryv1alpha1.TracePipeline{})

b.WatchesRawSource(
&source.Channel{Source: r.reconcileTriggerChan},
handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent),
)

ownedResourceTypesToWatch := []client.Object{
&appsv1.Deployment{},
&corev1.ConfigMap{},
Expand Down Expand Up @@ -102,6 +110,15 @@ func (r *TracePipelineController) mapTelemetryChanges(ctx context.Context, objec
return requests
}

func (r *TracePipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("Reconcile trigger event received")
requests, err := r.createRequestsForAllPipelines(ctx)
if err != nil {
logf.FromContext(ctx).Error(err, "Unable to create reconcile requests")
}
return requests
}

func (r *TracePipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) {
var pipelines telemetryv1alpha1.TracePipelineList
var requests []reconcile.Request
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kyma-project/telemetry-manager
go 1.21

require (
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.17.1
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.8.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
13 changes: 9 additions & 4 deletions internal/reconciler/telemetry/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ type WebhookConfig struct {
}

type SelfMonitorConfig struct {
Enabled bool
Config selfmonitor.Config
Enabled bool
Config selfmonitor.Config
WebhookURL string
WebhookScheme string
}

type healthCheckers struct {
Expand Down Expand Up @@ -146,8 +148,11 @@ func (r *Reconciler) reconcileSelfMonitor(ctx context.Context, telemetry operato
return nil
}

scrapeNamespace := r.config.SelfMonitor.Config.Namespace
selfMonitorConfig := config.MakeConfig(scrapeNamespace)
selfMonitorConfig := config.MakeConfig(config.BuilderConfig{
ScrapeNamespace: r.config.SelfMonitor.Config.Namespace,
WebhookURL: r.config.SelfMonitor.WebhookURL,
WebhookScheme: r.config.SelfMonitor.WebhookScheme,
})
selfMonitorConfigYAML, err := yaml.Marshal(selfMonitorConfig)
if err != nil {
return fmt.Errorf("failed to marshal selfmonitor config: %w", err)
Expand Down
7 changes: 7 additions & 0 deletions internal/selfmonitor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ type AlertingConfig struct {
}

type AlertManagerConfig struct {
Scheme string `yaml:"scheme,omitempty"`
PathPrefix string `yaml:"path_prefix,omitempty"`
StaticConfigs []AlertManagerStaticConfig `yaml:"static_configs"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
}

type AlertManagerStaticConfig struct {
Targets []string `yaml:"targets"`
}

type TLSConfig struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty"`
}

type ScrapeConfig struct {
JobName string `yaml:"job_name"`
SampleLimit int `yaml:"sample_limit,omitempty"`
Expand Down
23 changes: 15 additions & 8 deletions internal/selfmonitor/config/config_builder.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package config

import (
"fmt"
"time"

"github.com/kyma-project/telemetry-manager/internal/selfmonitor/ports"
)

func MakeConfig(scrapeNamespace string) Config {
type BuilderConfig struct {
ScrapeNamespace string
WebhookURL string
WebhookScheme string
}

func MakeConfig(builderCfg BuilderConfig) Config {
promConfig := Config{}
promConfig.GlobalConfig = makeGlobalConfig()
promConfig.AlertingConfig = makeAlertConfig()
promConfig.AlertingConfig = makeAlertConfig(builderCfg.WebhookURL, builderCfg.WebhookScheme)
promConfig.RuleFiles = []string{"/etc/prometheus/alerting_rules.yml"}
promConfig.ScrapeConfigs = makeScrapeConfig(scrapeNamespace)
promConfig.ScrapeConfigs = makeScrapeConfig(builderCfg.ScrapeNamespace)
return promConfig
}

Expand All @@ -23,12 +26,16 @@ func makeGlobalConfig() GlobalConfig {
}
}

func makeAlertConfig() AlertingConfig {
func makeAlertConfig(webhookURL, webhookScheme string) AlertingConfig {
return AlertingConfig{
AlertManagers: []AlertManagerConfig{{
Scheme: webhookScheme,
StaticConfigs: []AlertManagerStaticConfig{{
Targets: []string{fmt.Sprintf("localhost:%d", ports.AlertingPort)},
Targets: []string{webhookURL},
}},
TLSConfig: TLSConfig{
InsecureSkipVerify: true,
},
}},
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/selfmonitor/config/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
)

func TestMakeConfigMarshalling(t *testing.T) {
config := MakeConfig("kyma-system")
config := MakeConfig(BuilderConfig{
ScrapeNamespace: "kyma-system",
WebhookURL: "http://webhook:9090",
})
monitorConfigYaml, err := yaml.Marshal(config)
require.NoError(t, err)

Expand Down
4 changes: 3 additions & 1 deletion internal/selfmonitor/config/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ alerting:
alertmanagers:
- static_configs:
- targets:
- localhost:9093
- http://webhook:9090
tls_config:
insecure_skip_verify: true
rule_files:
- /etc/prometheus/alerting_rules.yml
scrape_configs:
Expand Down
67 changes: 67 additions & 0 deletions internal/selfmonitor/webhook/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package webhook

import (
"io"
"net/http"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type Handler struct {
subscribers []chan<- event.GenericEvent
logger logr.Logger
}

type Option = func(*Handler)

func WithLogger(logger logr.Logger) Option {
return func(h *Handler) {
h.logger = logger
}
}

func WithSubscriber(subscriber chan<- event.GenericEvent) Option {
return func(h *Handler) {
h.subscribers = append(h.subscribers, subscriber)
}
}

func NewHandler(opts ...Option) *Handler {
h := &Handler{
logger: logr.New(logf.NullLogSink{}),
}

for _, opt := range opts {
opt(h)
}

return h
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
h.logger.Info("Invalid method", "method", r.Method)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

req, err := io.ReadAll(r.Body)
if err != nil {
h.logger.Error(err, "Failed to read request body")
w.WriteHeader(http.StatusInternalServerError)
return
}

defer r.Body.Close()

h.logger.V(1).Info("Webhook called. Notifying the subscribers.",
"request", string(req))

for _, sub := range h.subscribers {
sub <- event.GenericEvent{}
}

w.WriteHeader(http.StatusOK)
}
Loading

0 comments on commit f24cea3

Please sign in to comment.