From f24cea3529a0110c4ecd2c94f4fcf0e12aa3cbc1 Mon Sep 17 00:00:00 2001 From: Stanislav Khalash Date: Wed, 27 Mar 2024 10:38:47 +0100 Subject: [PATCH] feat: Trigger immediate reconciliation when self-monitor alert state changes (#912) --- .golangci.yaml | 2 + .../telemetry/metricpipeline_controller.go | 27 +++++-- controllers/telemetry/suite_test.go | 3 + .../telemetry/tracepipeline_controller.go | 27 +++++-- go.mod | 2 +- internal/reconciler/telemetry/reconciler.go | 13 ++-- internal/selfmonitor/config/config.go | 7 ++ internal/selfmonitor/config/config_builder.go | 23 +++--- .../selfmonitor/config/config_builder_test.go | 5 +- .../selfmonitor/config/testdata/config.yaml | 4 +- internal/selfmonitor/webhook/handler.go | 67 +++++++++++++++++ internal/selfmonitor/webhook/handler_test.go | 72 +++++++++++++++++++ main.go | 34 ++++++--- 13 files changed, 253 insertions(+), 33 deletions(-) create mode 100644 internal/selfmonitor/webhook/handler.go create mode 100644 internal/selfmonitor/webhook/handler_test.go diff --git a/.golangci.yaml b/.golangci.yaml index 8822b4ae6..b428437a8 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -58,6 +58,8 @@ linters-settings: alias: telemetrycontrollers - pkg: github.com/kyma-project/telemetry-manager/internal/otelcollector/config/metric/agent alias: configmetricagent + - pkg: github.com/kyma-project/telemetry-manager/internal/selfmonitor/webhook + alias: selfmonitorwebhook - pkg: github.com/kyma-project/telemetry-manager/internal/resources/common alias: commonresources - pkg: github.com/kyma-project/telemetry-manager/test/testkit/k8s diff --git a/controllers/telemetry/metricpipeline_controller.go b/controllers/telemetry/metricpipeline_controller.go index 8812b344e..17a72db85 100644 --- a/controllers/telemetry/metricpipeline_controller.go +++ b/controllers/telemetry/metricpipeline_controller.go @@ -29,9 +29,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1" telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" @@ -42,14 +44,15 @@ import ( // MetricPipelineController reconciles a MetricPipeline object type MetricPipelineController struct { client.Client - - reconciler *metricpipeline.Reconciler + reconcileTriggerChan <-chan event.GenericEvent + reconciler *metricpipeline.Reconciler } -func NewMetricPipelineController(client client.Client, reconciler *metricpipeline.Reconciler) *MetricPipelineController { +func NewMetricPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, reconciler *metricpipeline.Reconciler) *MetricPipelineController { return &MetricPipelineController{ - Client: client, - reconciler: reconciler, + Client: client, + reconcileTriggerChan: reconcileTriggerChan, + reconciler: reconciler, } } @@ -61,6 +64,11 @@ func (r *MetricPipelineController) Reconcile(ctx context.Context, req ctrl.Reque func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error { b := ctrl.NewControllerManagedBy(mgr).For(&telemetryv1alpha1.MetricPipeline{}) + b.WatchesRawSource( + &source.Channel{Source: r.reconcileTriggerChan}, + handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent), + ) + ownedResourceTypesToWatch := []client.Object{ &appsv1.Deployment{}, &appsv1.DaemonSet{}, @@ -123,6 +131,15 @@ func (r *MetricPipelineController) mapTelemetryChanges(ctx context.Context, obje return requests } +func (r *MetricPipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request { + logf.FromContext(ctx).V(1).Info("Reconcile trigger event received") + requests, err := r.createRequestsForAllPipelines(ctx) + if err != nil { + logf.FromContext(ctx).Error(err, "Unable to create reconcile requests") + } + return requests +} + func (r *MetricPipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) { var pipelines telemetryv1alpha1.MetricPipelineList var requests []reconcile.Request diff --git a/controllers/telemetry/suite_test.go b/controllers/telemetry/suite_test.go index 1042d2141..52eb0a5e3 100644 --- a/controllers/telemetry/suite_test.go +++ b/controllers/telemetry/suite_test.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/log" logzap "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -149,6 +150,7 @@ var _ = BeforeSuite(func() { tracePipelineController := NewTracePipelineController( client, + make(chan event.GenericEvent), tracepipeline.NewReconciler(client, testTracePipelineReconcilerConfig, &k8sutils.DeploymentProber{Client: client}, false, nil, overridesHandler), ) err = tracePipelineController.SetupWithManager(mgr) @@ -156,6 +158,7 @@ var _ = BeforeSuite(func() { metricPipelineController := NewMetricPipelineController( client, + make(chan event.GenericEvent), metricpipeline.NewReconciler(client, testMetricPipelineReconcilerConfig, &k8sutils.DeploymentProber{Client: client}, &k8sutils.DaemonSetProber{Client: client}, false, nil, overridesHandler)) err = metricPipelineController.SetupWithManager(mgr) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/telemetry/tracepipeline_controller.go b/controllers/telemetry/tracepipeline_controller.go index f94482566..59b0a168f 100644 --- a/controllers/telemetry/tracepipeline_controller.go +++ b/controllers/telemetry/tracepipeline_controller.go @@ -28,9 +28,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1" telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" @@ -41,14 +43,15 @@ import ( // TracePipelineController reconciles a TracePipeline object type TracePipelineController struct { client.Client - - reconciler *tracepipeline.Reconciler + reconcileTriggerChan <-chan event.GenericEvent + reconciler *tracepipeline.Reconciler } -func NewTracePipelineController(client client.Client, reconciler *tracepipeline.Reconciler) *TracePipelineController { +func NewTracePipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, reconciler *tracepipeline.Reconciler) *TracePipelineController { return &TracePipelineController{ - Client: client, - reconciler: reconciler, + Client: client, + reconcileTriggerChan: reconcileTriggerChan, + reconciler: reconciler, } } @@ -59,6 +62,11 @@ func (r *TracePipelineController) Reconcile(ctx context.Context, req ctrl.Reques func (r *TracePipelineController) SetupWithManager(mgr ctrl.Manager) error { b := ctrl.NewControllerManagedBy(mgr).For(&telemetryv1alpha1.TracePipeline{}) + b.WatchesRawSource( + &source.Channel{Source: r.reconcileTriggerChan}, + handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent), + ) + ownedResourceTypesToWatch := []client.Object{ &appsv1.Deployment{}, &corev1.ConfigMap{}, @@ -102,6 +110,15 @@ func (r *TracePipelineController) mapTelemetryChanges(ctx context.Context, objec return requests } +func (r *TracePipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request { + logf.FromContext(ctx).V(1).Info("Reconcile trigger event received") + requests, err := r.createRequestsForAllPipelines(ctx) + if err != nil { + logf.FromContext(ctx).Error(err, "Unable to create reconcile requests") + } + return requests +} + func (r *TracePipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) { var pipelines telemetryv1alpha1.TracePipelineList var requests []reconcile.Request diff --git a/go.mod b/go.mod index a656629ff..40e7aedf4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/kyma-project/telemetry-manager go 1.21 require ( + github.com/go-logr/logr v1.4.1 github.com/go-logr/zapr v1.3.0 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.17.1 @@ -40,7 +41,6 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.8.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/internal/reconciler/telemetry/reconciler.go b/internal/reconciler/telemetry/reconciler.go index fd0c0c6df..8395900d9 100644 --- a/internal/reconciler/telemetry/reconciler.go +++ b/internal/reconciler/telemetry/reconciler.go @@ -56,8 +56,10 @@ type WebhookConfig struct { } type SelfMonitorConfig struct { - Enabled bool - Config selfmonitor.Config + Enabled bool + Config selfmonitor.Config + WebhookURL string + WebhookScheme string } type healthCheckers struct { @@ -146,8 +148,11 @@ func (r *Reconciler) reconcileSelfMonitor(ctx context.Context, telemetry operato return nil } - scrapeNamespace := r.config.SelfMonitor.Config.Namespace - selfMonitorConfig := config.MakeConfig(scrapeNamespace) + selfMonitorConfig := config.MakeConfig(config.BuilderConfig{ + ScrapeNamespace: r.config.SelfMonitor.Config.Namespace, + WebhookURL: r.config.SelfMonitor.WebhookURL, + WebhookScheme: r.config.SelfMonitor.WebhookScheme, + }) selfMonitorConfigYAML, err := yaml.Marshal(selfMonitorConfig) if err != nil { return fmt.Errorf("failed to marshal selfmonitor config: %w", err) diff --git a/internal/selfmonitor/config/config.go b/internal/selfmonitor/config/config.go index 53e3d4140..1472b128d 100644 --- a/internal/selfmonitor/config/config.go +++ b/internal/selfmonitor/config/config.go @@ -21,13 +21,20 @@ type AlertingConfig struct { } type AlertManagerConfig struct { + Scheme string `yaml:"scheme,omitempty"` + PathPrefix string `yaml:"path_prefix,omitempty"` StaticConfigs []AlertManagerStaticConfig `yaml:"static_configs"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` } type AlertManagerStaticConfig struct { Targets []string `yaml:"targets"` } +type TLSConfig struct { + InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty"` +} + type ScrapeConfig struct { JobName string `yaml:"job_name"` SampleLimit int `yaml:"sample_limit,omitempty"` diff --git a/internal/selfmonitor/config/config_builder.go b/internal/selfmonitor/config/config_builder.go index 939bc092f..b28f92c17 100644 --- a/internal/selfmonitor/config/config_builder.go +++ b/internal/selfmonitor/config/config_builder.go @@ -1,18 +1,21 @@ package config import ( - "fmt" "time" - - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/ports" ) -func MakeConfig(scrapeNamespace string) Config { +type BuilderConfig struct { + ScrapeNamespace string + WebhookURL string + WebhookScheme string +} + +func MakeConfig(builderCfg BuilderConfig) Config { promConfig := Config{} promConfig.GlobalConfig = makeGlobalConfig() - promConfig.AlertingConfig = makeAlertConfig() + promConfig.AlertingConfig = makeAlertConfig(builderCfg.WebhookURL, builderCfg.WebhookScheme) promConfig.RuleFiles = []string{"/etc/prometheus/alerting_rules.yml"} - promConfig.ScrapeConfigs = makeScrapeConfig(scrapeNamespace) + promConfig.ScrapeConfigs = makeScrapeConfig(builderCfg.ScrapeNamespace) return promConfig } @@ -23,12 +26,16 @@ func makeGlobalConfig() GlobalConfig { } } -func makeAlertConfig() AlertingConfig { +func makeAlertConfig(webhookURL, webhookScheme string) AlertingConfig { return AlertingConfig{ AlertManagers: []AlertManagerConfig{{ + Scheme: webhookScheme, StaticConfigs: []AlertManagerStaticConfig{{ - Targets: []string{fmt.Sprintf("localhost:%d", ports.AlertingPort)}, + Targets: []string{webhookURL}, }}, + TLSConfig: TLSConfig{ + InsecureSkipVerify: true, + }, }}, } } diff --git a/internal/selfmonitor/config/config_builder_test.go b/internal/selfmonitor/config/config_builder_test.go index 9527adfb8..5fe516a1c 100644 --- a/internal/selfmonitor/config/config_builder_test.go +++ b/internal/selfmonitor/config/config_builder_test.go @@ -10,7 +10,10 @@ import ( ) func TestMakeConfigMarshalling(t *testing.T) { - config := MakeConfig("kyma-system") + config := MakeConfig(BuilderConfig{ + ScrapeNamespace: "kyma-system", + WebhookURL: "http://webhook:9090", + }) monitorConfigYaml, err := yaml.Marshal(config) require.NoError(t, err) diff --git a/internal/selfmonitor/config/testdata/config.yaml b/internal/selfmonitor/config/testdata/config.yaml index a383537bb..2a2c55cb7 100644 --- a/internal/selfmonitor/config/testdata/config.yaml +++ b/internal/selfmonitor/config/testdata/config.yaml @@ -5,7 +5,9 @@ alerting: alertmanagers: - static_configs: - targets: - - localhost:9093 + - http://webhook:9090 + tls_config: + insecure_skip_verify: true rule_files: - /etc/prometheus/alerting_rules.yml scrape_configs: diff --git a/internal/selfmonitor/webhook/handler.go b/internal/selfmonitor/webhook/handler.go new file mode 100644 index 000000000..bb9d881a5 --- /dev/null +++ b/internal/selfmonitor/webhook/handler.go @@ -0,0 +1,67 @@ +package webhook + +import ( + "io" + "net/http" + + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/event" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type Handler struct { + subscribers []chan<- event.GenericEvent + logger logr.Logger +} + +type Option = func(*Handler) + +func WithLogger(logger logr.Logger) Option { + return func(h *Handler) { + h.logger = logger + } +} + +func WithSubscriber(subscriber chan<- event.GenericEvent) Option { + return func(h *Handler) { + h.subscribers = append(h.subscribers, subscriber) + } +} + +func NewHandler(opts ...Option) *Handler { + h := &Handler{ + logger: logr.New(logf.NullLogSink{}), + } + + for _, opt := range opts { + opt(h) + } + + return h +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.logger.Info("Invalid method", "method", r.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + req, err := io.ReadAll(r.Body) + if err != nil { + h.logger.Error(err, "Failed to read request body") + w.WriteHeader(http.StatusInternalServerError) + return + } + + defer r.Body.Close() + + h.logger.V(1).Info("Webhook called. Notifying the subscribers.", + "request", string(req)) + + for _, sub := range h.subscribers { + sub <- event.GenericEvent{} + } + + w.WriteHeader(http.StatusOK) +} diff --git a/internal/selfmonitor/webhook/handler_test.go b/internal/selfmonitor/webhook/handler_test.go new file mode 100644 index 000000000..f25fe61c7 --- /dev/null +++ b/internal/selfmonitor/webhook/handler_test.go @@ -0,0 +1,72 @@ +package webhook + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/event" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type errReader struct{} + +func (errReader) Read(p []byte) (n int, err error) { + return 0, assert.AnError +} + +func TestHandler(t *testing.T) { + tests := []struct { + name string + requestMethod string + requestBody io.Reader + expectedStatus int + expectEvent bool + }{ + { + name: "valid", + requestMethod: http.MethodPost, + expectedStatus: http.StatusOK, + expectEvent: true, + }, + { + name: "invalid method", + requestMethod: http.MethodGet, + expectedStatus: http.StatusMethodNotAllowed, + }, + { + name: "failed to read request body", + requestMethod: http.MethodPost, + requestBody: errReader{}, + expectedStatus: http.StatusInternalServerError, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ch := make(chan event.GenericEvent, 1) + + noopLogger := logr.New(logf.NullLogSink{}) + handler := NewHandler(WithSubscriber(ch), WithLogger(noopLogger)) + if tc.requestBody == nil { + tc.requestBody = bytes.NewBuffer([]byte(`{"key":"value"}`)) + } + + req, err := http.NewRequest(tc.requestMethod, "/", tc.requestBody) + require.NoError(t, err) + + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + require.Equal(t, tc.expectedStatus, rr.Code) + if tc.expectEvent { + require.NotEmpty(t, ch) + } + }) + } +} diff --git a/main.go b/main.go index db4bacc17..26d4bb2f5 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "context" "errors" "flag" + "fmt" "os" "strings" "time" @@ -41,6 +42,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -63,6 +65,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" "github.com/kyma-project/telemetry-manager/internal/resources/selfmonitor" "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + selfmonitorwebhook "github.com/kyma-project/telemetry-manager/internal/selfmonitor/webhook" "github.com/kyma-project/telemetry-manager/internal/webhookcert" "github.com/kyma-project/telemetry-manager/webhook/dryrun" logparserwebhook "github.com/kyma-project/telemetry-manager/webhook/logparser" @@ -352,8 +355,12 @@ func main() { }) enableLoggingController(mgr) - enableTracingController(mgr) - enableMetricsController(mgr) + + tracingControllerReconcileTriggerChan := make(chan event.GenericEvent) + enableTracingController(mgr, tracingControllerReconcileTriggerChan) + + metricsControllerReconcileTriggerChan := make(chan event.GenericEvent) + enableMetricsController(mgr, metricsControllerReconcileTriggerChan) webhookConfig := createWebhookConfig() selfMonitorConfig := createSelfMonitoringConfig() @@ -375,6 +382,13 @@ func main() { enableWebhookServer(mgr, webhookConfig) } + if enableWebhook && enableSelfMonitor { + mgr.GetWebhookServer().Register("/api/v2/alerts", selfmonitorwebhook.NewHandler( + selfmonitorwebhook.WithSubscriber(tracingControllerReconcileTriggerChan), + selfmonitorwebhook.WithSubscriber(metricsControllerReconcileTriggerChan), + selfmonitorwebhook.WithLogger(ctrl.Log.WithName("self-monitor-webhook")))) + } + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "Failed to run manager") os.Exit(1) @@ -407,7 +421,7 @@ func enableLoggingController(mgr manager.Manager) { } } -func enableTracingController(mgr manager.Manager) { +func enableTracingController(mgr manager.Manager, reconcileTriggerChan <-chan event.GenericEvent) { setupLog.Info("Starting with tracing controller") var err error var flowHealthProber *flowhealth.Prober @@ -417,13 +431,13 @@ func enableTracingController(mgr manager.Manager) { os.Exit(1) } - if err := createTracePipelineController(mgr.GetClient(), flowHealthProber).SetupWithManager(mgr); err != nil { + if err := createTracePipelineController(mgr.GetClient(), reconcileTriggerChan, flowHealthProber).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Failed to create controller", "controller", "TracePipeline") os.Exit(1) } } -func enableMetricsController(mgr manager.Manager) { +func enableMetricsController(mgr manager.Manager, reconcileTriggerChan <-chan event.GenericEvent) { setupLog.Info("Starting with metrics controller") var err error var flowHealthProber *flowhealth.Prober @@ -433,7 +447,7 @@ func enableMetricsController(mgr manager.Manager) { os.Exit(1) } - if err := createMetricPipelineController(mgr.GetClient(), flowHealthProber).SetupWithManager(mgr); err != nil { + if err := createMetricPipelineController(mgr.GetClient(), reconcileTriggerChan, flowHealthProber).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Failed to create controller", "controller", "MetricPipeline") os.Exit(1) } @@ -535,7 +549,7 @@ func createLogParserValidator(client client.Client) *logparserwebhook.Validating admission.NewDecoder(scheme)) } -func createTracePipelineController(client client.Client, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.TracePipelineController { +func createTracePipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.TracePipelineController { config := tracepipeline.Config{ Gateway: otelcollector.GatewayConfig{ Config: otelcollector.Config{ @@ -564,6 +578,7 @@ func createTracePipelineController(client client.Client, flowHealthProber *flowh return telemetrycontrollers.NewTracePipelineController( client, + reconcileTriggerChan, tracepipeline.NewReconciler( client, config, @@ -574,7 +589,7 @@ func createTracePipelineController(client client.Client, flowHealthProber *flowh ) } -func createMetricPipelineController(client client.Client, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.MetricPipelineController { +func createMetricPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.MetricPipelineController { config := metricpipeline.Config{ Agent: otelcollector.AgentConfig{ Config: otelcollector.Config{ @@ -617,6 +632,7 @@ func createMetricPipelineController(client client.Client, flowHealthProber *flow return telemetrycontrollers.NewMetricPipelineController( client, + reconcileTriggerChan, metricpipeline.NewReconciler( client, config, @@ -643,6 +659,8 @@ func createSelfMonitoringConfig() telemetry.SelfMonitorConfig { MemoryRequest: resource.MustParse(selfMonitorMemoryRequest), }, }, + WebhookScheme: "https", + WebhookURL: fmt.Sprintf("%s.%s.svc", webhookServiceName, telemetryNamespace), } }