From 05d90fdc0318cf82971556ffc91af12e70f57d25 Mon Sep 17 00:00:00 2001 From: Stanislav Khalash Date: Thu, 4 Apr 2024 17:13:46 +0200 Subject: [PATCH] chore: Optimize reconciliation triggered by the self-monitor webhook (#935) --- .../telemetry/metricpipeline_controller.go | 11 +- .../telemetry/tracepipeline_controller.go | 11 +- .../mocks/flow_health_prober.go | 7 +- .../reconciler/metricpipeline/reconciler.go | 4 +- internal/reconciler/metricpipeline/status.go | 4 +- .../reconciler/metricpipeline/status_test.go | 20 +-- internal/reconciler/telemetry/reconciler.go | 4 +- .../tracepipeline/mocks/flow_health_prober.go | 6 +- .../reconciler/tracepipeline/reconciler.go | 4 +- internal/reconciler/tracepipeline/status.go | 4 +- .../reconciler/tracepipeline/status_test.go | 20 +-- .../expr_builder.go | 4 +- .../selfmonitor/alertrules/pipeline_type.go | 8 ++ .../{flowhealth => alertrules}/rules.go | 74 +++++----- .../{flowhealth => alertrules}/rules_test.go | 2 +- .../mocks/alert_getter.go | 4 +- .../{flowhealth => prober}/prober.go | 50 +++---- .../{flowhealth => prober}/prober_test.go | 9 +- internal/selfmonitor/webhook/handler.go | 125 +++++++++++++++-- internal/selfmonitor/webhook/handler_test.go | 129 ++++++++++++++++-- main.go | 20 +-- 21 files changed, 350 insertions(+), 170 deletions(-) rename internal/selfmonitor/{flowhealth => alertrules}/expr_builder.go (92%) create mode 100644 internal/selfmonitor/alertrules/pipeline_type.go rename internal/selfmonitor/{flowhealth => alertrules}/rules.go (63%) rename internal/selfmonitor/{flowhealth => alertrules}/rules_test.go (99%) rename internal/selfmonitor/{flowhealth => prober}/mocks/alert_getter.go (95%) rename internal/selfmonitor/{flowhealth => prober}/prober.go (74%) rename internal/selfmonitor/{flowhealth => prober}/prober_test.go (97%) diff --git a/controllers/telemetry/metricpipeline_controller.go b/controllers/telemetry/metricpipeline_controller.go index 17a72db85..e6223b155 100644 --- a/controllers/telemetry/metricpipeline_controller.go +++ b/controllers/telemetry/metricpipeline_controller.go @@ -66,7 +66,7 @@ func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error { b.WatchesRawSource( &source.Channel{Source: r.reconcileTriggerChan}, - handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent), + &handler.EnqueueRequestForObject{}, ) ownedResourceTypesToWatch := []client.Object{ @@ -131,15 +131,6 @@ func (r *MetricPipelineController) mapTelemetryChanges(ctx context.Context, obje return requests } -func (r *MetricPipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request { - logf.FromContext(ctx).V(1).Info("Reconcile trigger event received") - requests, err := r.createRequestsForAllPipelines(ctx) - if err != nil { - logf.FromContext(ctx).Error(err, "Unable to create reconcile requests") - } - return requests -} - func (r *MetricPipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) { var pipelines telemetryv1alpha1.MetricPipelineList var requests []reconcile.Request diff --git a/controllers/telemetry/tracepipeline_controller.go b/controllers/telemetry/tracepipeline_controller.go index 59b0a168f..4968392e8 100644 --- a/controllers/telemetry/tracepipeline_controller.go +++ b/controllers/telemetry/tracepipeline_controller.go @@ -64,7 +64,7 @@ func (r *TracePipelineController) SetupWithManager(mgr ctrl.Manager) error { b.WatchesRawSource( &source.Channel{Source: r.reconcileTriggerChan}, - handler.EnqueueRequestsFromMapFunc(r.mapReconcileTriggerEvent), + &handler.EnqueueRequestForObject{}, ) ownedResourceTypesToWatch := []client.Object{ @@ -110,15 +110,6 @@ func (r *TracePipelineController) mapTelemetryChanges(ctx context.Context, objec return requests } -func (r *TracePipelineController) mapReconcileTriggerEvent(ctx context.Context, _ client.Object) []reconcile.Request { - logf.FromContext(ctx).V(1).Info("Reconcile trigger event received") - requests, err := r.createRequestsForAllPipelines(ctx) - if err != nil { - logf.FromContext(ctx).Error(err, "Unable to create reconcile requests") - } - return requests -} - func (r *TracePipelineController) createRequestsForAllPipelines(ctx context.Context) ([]reconcile.Request, error) { var pipelines telemetryv1alpha1.TracePipelineList var requests []reconcile.Request diff --git a/internal/reconciler/metricpipeline/mocks/flow_health_prober.go b/internal/reconciler/metricpipeline/mocks/flow_health_prober.go index f725c1808..47a31a605 100644 --- a/internal/reconciler/metricpipeline/mocks/flow_health_prober.go +++ b/internal/reconciler/metricpipeline/mocks/flow_health_prober.go @@ -3,11 +3,10 @@ package mocks import ( - context "context" + "context" - flowhealth "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" - - mock "github.com/stretchr/testify/mock" + flowhealth "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" + "github.com/stretchr/testify/mock" ) // FlowHealthProber is an autogenerated mock type for the FlowHealthProber type diff --git a/internal/reconciler/metricpipeline/reconciler.go b/internal/reconciler/metricpipeline/reconciler.go index 42e3508bf..016127a68 100644 --- a/internal/reconciler/metricpipeline/reconciler.go +++ b/internal/reconciler/metricpipeline/reconciler.go @@ -20,7 +20,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/overrides" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" "github.com/kyma-project/telemetry-manager/internal/secretref" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" ) const defaultReplicaCount int32 = 2 @@ -44,7 +44,7 @@ type DaemonSetProber interface { //go:generate mockery --name FlowHealthProber --filename flow_health_prober.go type FlowHealthProber interface { - Probe(ctx context.Context, pipelineName string) (flowhealth.ProbeResult, error) + Probe(ctx context.Context, pipelineName string) (prober.ProbeResult, error) } type Reconciler struct { diff --git a/internal/reconciler/metricpipeline/status.go b/internal/reconciler/metricpipeline/status.go index 4af020a23..2c8e4c246 100644 --- a/internal/reconciler/metricpipeline/status.go +++ b/internal/reconciler/metricpipeline/status.go @@ -13,7 +13,7 @@ import ( telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" "github.com/kyma-project/telemetry-manager/internal/conditions" "github.com/kyma-project/telemetry-manager/internal/secretref" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" ) func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, withinPipelineCountLimit bool) error { @@ -159,7 +159,7 @@ func (r *Reconciler) setFlowHealthCondition(ctx context.Context, pipeline *telem meta.SetStatusCondition(&pipeline.Status.Conditions, condition) } -func flowHealthReasonFor(probeResult flowhealth.ProbeResult) string { +func flowHealthReasonFor(probeResult prober.ProbeResult) string { if probeResult.AllDataDropped { return conditions.ReasonAllDataDropped } diff --git a/internal/reconciler/metricpipeline/status_test.go b/internal/reconciler/metricpipeline/status_test.go index 8024ee135..6d4b4ca49 100644 --- a/internal/reconciler/metricpipeline/status_test.go +++ b/internal/reconciler/metricpipeline/status_test.go @@ -20,7 +20,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/conditions" "github.com/kyma-project/telemetry-manager/internal/reconciler/metricpipeline/mocks" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" "github.com/kyma-project/telemetry-manager/internal/testutils" ) @@ -315,7 +315,7 @@ func TestUpdateStatus(t *testing.T) { t.Run("flow healthy", func(t *testing.T) { tests := []struct { name string - probe flowhealth.ProbeResult + probe prober.ProbeResult probeErr error expectedStatus metav1.ConditionStatus expectedReason string @@ -328,7 +328,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "healthy", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ Healthy: true, }, expectedStatus: metav1.ConditionTrue, @@ -336,7 +336,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "throttling", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ Throttling: true, }, expectedStatus: metav1.ConditionFalse, @@ -344,7 +344,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "buffer filling up", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ QueueAlmostFull: true, }, expectedStatus: metav1.ConditionFalse, @@ -352,7 +352,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "buffer filling up shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ QueueAlmostFull: true, Throttling: true, }, @@ -361,7 +361,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "some data dropped", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ SomeDataDropped: true, }, expectedStatus: metav1.ConditionFalse, @@ -369,7 +369,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "some data dropped shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ SomeDataDropped: true, Throttling: true, }, @@ -378,7 +378,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "all data dropped", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ AllDataDropped: true, }, expectedStatus: metav1.ConditionFalse, @@ -386,7 +386,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "all data dropped shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ AllDataDropped: true, Throttling: true, }, diff --git a/internal/reconciler/telemetry/reconciler.go b/internal/reconciler/telemetry/reconciler.go index 8395900d9..19be35a96 100644 --- a/internal/reconciler/telemetry/reconciler.go +++ b/internal/reconciler/telemetry/reconciler.go @@ -23,8 +23,8 @@ import ( "github.com/kyma-project/telemetry-manager/internal/k8sutils" "github.com/kyma-project/telemetry-manager/internal/overrides" "github.com/kyma-project/telemetry-manager/internal/resources/selfmonitor" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" "github.com/kyma-project/telemetry-manager/internal/selfmonitor/config" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" "github.com/kyma-project/telemetry-manager/internal/webhookcert" ) @@ -158,7 +158,7 @@ func (r *Reconciler) reconcileSelfMonitor(ctx context.Context, telemetry operato return fmt.Errorf("failed to marshal selfmonitor config: %w", err) } - rules := flowhealth.MakeRules() + rules := alertrules.MakeRules() rulesYAML, err := yaml.Marshal(rules) if err != nil { return fmt.Errorf("failed to marshal rules: %w", err) diff --git a/internal/reconciler/tracepipeline/mocks/flow_health_prober.go b/internal/reconciler/tracepipeline/mocks/flow_health_prober.go index f2783a764..47a31a605 100644 --- a/internal/reconciler/tracepipeline/mocks/flow_health_prober.go +++ b/internal/reconciler/tracepipeline/mocks/flow_health_prober.go @@ -3,10 +3,10 @@ package mocks import ( - context "context" + "context" - flowhealth "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" - mock "github.com/stretchr/testify/mock" + flowhealth "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" + "github.com/stretchr/testify/mock" ) // FlowHealthProber is an autogenerated mock type for the FlowHealthProber type diff --git a/internal/reconciler/tracepipeline/reconciler.go b/internal/reconciler/tracepipeline/reconciler.go index f5eb3a1cc..2e6da6a74 100644 --- a/internal/reconciler/tracepipeline/reconciler.go +++ b/internal/reconciler/tracepipeline/reconciler.go @@ -35,7 +35,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/overrides" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" "github.com/kyma-project/telemetry-manager/internal/secretref" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" ) const defaultReplicaCount int32 = 2 @@ -53,7 +53,7 @@ type DeploymentProber interface { //go:generate mockery --name FlowHealthProber --filename flow_health_prober.go type FlowHealthProber interface { - Probe(ctx context.Context, pipelineName string) (flowhealth.ProbeResult, error) + Probe(ctx context.Context, pipelineName string) (prober.ProbeResult, error) } type Reconciler struct { diff --git a/internal/reconciler/tracepipeline/status.go b/internal/reconciler/tracepipeline/status.go index fe4382692..fb7043425 100644 --- a/internal/reconciler/tracepipeline/status.go +++ b/internal/reconciler/tracepipeline/status.go @@ -13,7 +13,7 @@ import ( telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" "github.com/kyma-project/telemetry-manager/internal/conditions" "github.com/kyma-project/telemetry-manager/internal/secretref" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" ) func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, withinPipelineCountLimit bool) error { @@ -133,7 +133,7 @@ func (r *Reconciler) setFlowHealthCondition(ctx context.Context, pipeline *telem meta.SetStatusCondition(&pipeline.Status.Conditions, condition) } -func flowHealthReasonFor(probeResult flowhealth.ProbeResult) string { +func flowHealthReasonFor(probeResult prober.ProbeResult) string { if probeResult.AllDataDropped { return conditions.ReasonAllDataDropped } diff --git a/internal/reconciler/tracepipeline/status_test.go b/internal/reconciler/tracepipeline/status_test.go index 53f6304f4..86e39c448 100644 --- a/internal/reconciler/tracepipeline/status_test.go +++ b/internal/reconciler/tracepipeline/status_test.go @@ -19,7 +19,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/conditions" "github.com/kyma-project/telemetry-manager/internal/reconciler/tracepipeline/mocks" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" "github.com/kyma-project/telemetry-manager/internal/testutils" ) @@ -349,7 +349,7 @@ func TestUpdateStatus(t *testing.T) { t.Run("flow healthy", func(t *testing.T) { tests := []struct { name string - probe flowhealth.ProbeResult + probe prober.ProbeResult probeErr error expectedStatus metav1.ConditionStatus expectedReason string @@ -362,7 +362,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "healthy", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ Healthy: true, }, expectedStatus: metav1.ConditionTrue, @@ -370,7 +370,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "throttling", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ Throttling: true, }, expectedStatus: metav1.ConditionFalse, @@ -378,7 +378,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "buffer filling up", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ QueueAlmostFull: true, }, expectedStatus: metav1.ConditionFalse, @@ -386,7 +386,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "buffer filling up shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ QueueAlmostFull: true, Throttling: true, }, @@ -395,7 +395,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "some data dropped", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ SomeDataDropped: true, }, expectedStatus: metav1.ConditionFalse, @@ -403,7 +403,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "some data dropped shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ SomeDataDropped: true, Throttling: true, }, @@ -412,7 +412,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "all data dropped", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ AllDataDropped: true, }, expectedStatus: metav1.ConditionFalse, @@ -420,7 +420,7 @@ func TestUpdateStatus(t *testing.T) { }, { name: "all data dropped shadows other problems", - probe: flowhealth.ProbeResult{ + probe: prober.ProbeResult{ AllDataDropped: true, Throttling: true, }, diff --git a/internal/selfmonitor/flowhealth/expr_builder.go b/internal/selfmonitor/alertrules/expr_builder.go similarity index 92% rename from internal/selfmonitor/flowhealth/expr_builder.go rename to internal/selfmonitor/alertrules/expr_builder.go index 07b7ffff2..e6792cc4a 100644 --- a/internal/selfmonitor/flowhealth/expr_builder.go +++ b/internal/selfmonitor/alertrules/expr_builder.go @@ -1,4 +1,4 @@ -package flowhealth +package alertrules import ( "fmt" @@ -16,7 +16,7 @@ type labelSelector func(string) string func selectService(serviceName string) labelSelector { return func(metric string) string { - return fmt.Sprintf("%s{%s=\"%s\"}", metric, serviceLabelKey, serviceName) + return fmt.Sprintf("%s{%s=\"%s\"}", metric, LabelService, serviceName) } } diff --git a/internal/selfmonitor/alertrules/pipeline_type.go b/internal/selfmonitor/alertrules/pipeline_type.go new file mode 100644 index 000000000..d65c0998b --- /dev/null +++ b/internal/selfmonitor/alertrules/pipeline_type.go @@ -0,0 +1,8 @@ +package alertrules + +type PipelineType string + +const ( + MetricPipeline PipelineType = "MetricPipeline" + TracePipeline PipelineType = "TracePipeline" +) diff --git a/internal/selfmonitor/flowhealth/rules.go b/internal/selfmonitor/alertrules/rules.go similarity index 63% rename from internal/selfmonitor/flowhealth/rules.go rename to internal/selfmonitor/alertrules/rules.go index 06bad9908..80b2fd0cf 100644 --- a/internal/selfmonitor/flowhealth/rules.go +++ b/internal/selfmonitor/alertrules/rules.go @@ -1,4 +1,4 @@ -package flowhealth +package alertrules import ( "fmt" @@ -26,8 +26,8 @@ type Rule struct { } func MakeRules() RuleGroups { - metricRuleBuilder := newRuleBuilder(FlowTypeMetrics) - traceRuleBuilder := newRuleBuilder(FlowTypeTraces) + metricRuleBuilder := newRuleBuilder(MetricPipeline) + traceRuleBuilder := newRuleBuilder(TracePipeline) ruleBuilders := []ruleBuilder{metricRuleBuilder, traceRuleBuilder} var rules []Rule @@ -46,48 +46,44 @@ func MakeRules() RuleGroups { } const ( - serviceLabelKey = "service" - exporterLabelKey = "exporter" - receiverLabelKey = "receiver" - - alertNameExporterSentData = "ExporterSentData" - alertNameExporterDroppedData = "ExporterDroppedData" - alertNameExporterQueueAlmostFull = "ExporterQueueAlmostFull" - alertNameExporterEnqueueFailed = "ExporterEnqueueFailed" - alertNameReceiverRefusedData = "ReceiverRefusedData" + LabelService = "service" + LabelExporter = "exporter" + LabelReceiver = "receiver" + + RuleNameGatewayExporterSentData = "GatewayExporterSentData" + RuleNameGatewayExporterDroppedData = "GatewayExporterDroppedData" + RuleNameGatewayExporterQueueAlmostFull = "GatewayExporterQueueAlmostFull" + RuleNameGatewayExporterEnqueueFailed = "GatewayExporterEnqueueFailed" + RuleNameGatewayReceiverRefusedData = "GatewayReceiverRefusedData" ) -type ruleBuilder struct { - serviceName string - dataType string - nameDecorator ruleNameDecorator -} - -type ruleNameDecorator func(string) string +func RuleNamePrefix(t PipelineType) string { + if t == TracePipeline { + return "Trace" + } -var traceRuleNameDecorator = func(name string) string { - return "TraceGateway" + name + return "Metric" } -var metricRuleNameDecorator = func(name string) string { - return "MetricGateway" + name +type ruleBuilder struct { + serviceName string + dataType string + namePrefix string } -func newRuleBuilder(t FlowType) ruleBuilder { +func newRuleBuilder(t PipelineType) ruleBuilder { serviceName := "telemetry-metric-gateway-metrics" dataType := "metric_points" - nameDecorator := metricRuleNameDecorator - if t == FlowTypeTraces { + if t == TracePipeline { serviceName = "telemetry-trace-collector-metrics" dataType = "spans" - nameDecorator = traceRuleNameDecorator } return ruleBuilder{ - dataType: dataType, - serviceName: serviceName, - nameDecorator: nameDecorator, + dataType: dataType, + serviceName: serviceName, + namePrefix: RuleNamePrefix(t), } } @@ -104,9 +100,9 @@ func (rb ruleBuilder) rules() []Rule { func (rb ruleBuilder) exporterSentRule() Rule { metric := fmt.Sprintf("otelcol_exporter_sent_%s", rb.dataType) return Rule{ - Alert: rb.nameDecorator(alertNameExporterSentData), + Alert: rb.namePrefix + RuleNameGatewayExporterSentData, Expr: rate(metric, selectService(rb.serviceName)). - sumBy(exporterLabelKey). + sumBy(LabelExporter). greaterThan(0). build(), } @@ -115,9 +111,9 @@ func (rb ruleBuilder) exporterSentRule() Rule { func (rb ruleBuilder) exporterDroppedRule() Rule { metric := fmt.Sprintf("otelcol_exporter_send_failed_%s", rb.dataType) return Rule{ - Alert: rb.nameDecorator(alertNameExporterDroppedData), + Alert: rb.namePrefix + RuleNameGatewayExporterDroppedData, Expr: rate(metric, selectService(rb.serviceName)). - sumBy(exporterLabelKey). + sumBy(LabelExporter). greaterThan(0). build(), } @@ -125,7 +121,7 @@ func (rb ruleBuilder) exporterDroppedRule() Rule { func (rb ruleBuilder) exporterQueueAlmostFullRule() Rule { return Rule{ - Alert: rb.nameDecorator(alertNameExporterQueueAlmostFull), + Alert: rb.namePrefix + RuleNameGatewayExporterQueueAlmostFull, Expr: div("otelcol_exporter_queue_size", "otelcol_exporter_queue_capacity", selectService(rb.serviceName)). greaterThan(0.8). build(), @@ -135,9 +131,9 @@ func (rb ruleBuilder) exporterQueueAlmostFullRule() Rule { func (rb ruleBuilder) exporterEnqueueFailedRule() Rule { metric := fmt.Sprintf("otelcol_exporter_enqueue_failed_%s", rb.dataType) return Rule{ - Alert: rb.nameDecorator(alertNameExporterEnqueueFailed), + Alert: rb.namePrefix + RuleNameGatewayExporterEnqueueFailed, Expr: rate(metric, selectService(rb.serviceName)). - sumBy(exporterLabelKey). + sumBy(LabelExporter). greaterThan(0). build(), } @@ -146,9 +142,9 @@ func (rb ruleBuilder) exporterEnqueueFailedRule() Rule { func (rb ruleBuilder) receiverRefusedRule() Rule { metric := fmt.Sprintf("otelcol_receiver_refused_%s", rb.dataType) return Rule{ - Alert: rb.nameDecorator(alertNameReceiverRefusedData), + Alert: rb.namePrefix + RuleNameGatewayReceiverRefusedData, Expr: rate(metric, selectService(rb.serviceName)). - sumBy(receiverLabelKey). + sumBy(LabelReceiver). greaterThan(0). build(), } diff --git a/internal/selfmonitor/flowhealth/rules_test.go b/internal/selfmonitor/alertrules/rules_test.go similarity index 99% rename from internal/selfmonitor/flowhealth/rules_test.go rename to internal/selfmonitor/alertrules/rules_test.go index 6914077e1..be7e51d70 100644 --- a/internal/selfmonitor/flowhealth/rules_test.go +++ b/internal/selfmonitor/alertrules/rules_test.go @@ -1,4 +1,4 @@ -package flowhealth +package alertrules import ( "testing" diff --git a/internal/selfmonitor/flowhealth/mocks/alert_getter.go b/internal/selfmonitor/prober/mocks/alert_getter.go similarity index 95% rename from internal/selfmonitor/flowhealth/mocks/alert_getter.go rename to internal/selfmonitor/prober/mocks/alert_getter.go index 664bc5ad4..fd2d050bc 100644 --- a/internal/selfmonitor/flowhealth/mocks/alert_getter.go +++ b/internal/selfmonitor/prober/mocks/alert_getter.go @@ -3,9 +3,9 @@ package mocks import ( - context "context" + "context" - mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/mock" v1 "github.com/prometheus/client_golang/api/prometheus/v1" ) diff --git a/internal/selfmonitor/flowhealth/prober.go b/internal/selfmonitor/prober/prober.go similarity index 74% rename from internal/selfmonitor/flowhealth/prober.go rename to internal/selfmonitor/prober/prober.go index 82d6e296d..5bfd36228 100644 --- a/internal/selfmonitor/flowhealth/prober.go +++ b/internal/selfmonitor/prober/prober.go @@ -1,4 +1,4 @@ -package flowhealth +package prober import ( "context" @@ -12,6 +12,7 @@ import ( telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/otlpexporter" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" "github.com/kyma-project/telemetry-manager/internal/selfmonitor/ports" ) @@ -24,20 +25,13 @@ type alertGetter interface { Alerts(ctx context.Context) (promv1.AlertsResult, error) } -type FlowType string - -const ( - FlowTypeTraces FlowType = "traces" - FlowTypeMetrics FlowType = "metrics" -) - type Prober struct { clientTimeout time.Duration getter alertGetter - nameDecorator ruleNameDecorator + pipelineType alertrules.PipelineType } -func NewProber(flowType FlowType, selfMonitorName types.NamespacedName) (*Prober, error) { +func NewProber(pipelineType alertrules.PipelineType, selfMonitorName types.NamespacedName) (*Prober, error) { client, err := api.NewClient(api.Config{ Address: fmt.Sprintf("http://%s.%s:%d", selfMonitorName.Name, selfMonitorName.Namespace, ports.PrometheusPort), }) @@ -45,15 +39,10 @@ func NewProber(flowType FlowType, selfMonitorName types.NamespacedName) (*Prober return nil, fmt.Errorf("failed to create Prometheus client: %w", err) } - nameDecorator := metricRuleNameDecorator - if flowType == FlowTypeTraces { - nameDecorator = traceRuleNameDecorator - } - return &Prober{ getter: promv1.NewAPI(client), clientTimeout: clientTimeout, - nameDecorator: nameDecorator, + pipelineType: pipelineType, }, nil } @@ -81,34 +70,34 @@ func (p *Prober) Probe(ctx context.Context, pipelineName string) (ProbeResult, e } func (p *Prober) allDataDropped(alerts []promv1.Alert, pipelineName string) bool { - exporterSentFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterSentData, pipelineName) - exporterDroppedFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterDroppedData, pipelineName) - exporterEnqueueFailedFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterEnqueueFailed, pipelineName) + exporterSentFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName) + exporterDroppedFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName) + exporterEnqueueFailedFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName) return !exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring) } func (p *Prober) someDataDropped(alerts []promv1.Alert, pipelineName string) bool { - exporterSentFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterSentData, pipelineName) - exporterDroppedFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterDroppedData, pipelineName) - exporterEnqueueFailedFiring := p.hasFiringAlertForPipeline(alerts, alertNameExporterEnqueueFailed, pipelineName) + exporterSentFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName) + exporterDroppedFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName) + exporterEnqueueFailedFiring := p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName) return exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring) } func (p *Prober) queueAlmostFull(alerts []promv1.Alert, pipelineName string) bool { - return p.hasFiringAlertForPipeline(alerts, alertNameExporterQueueAlmostFull, pipelineName) + return p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName) } func (p *Prober) throttling(alerts []promv1.Alert) bool { - return p.hasFiringAlert(alerts, alertNameReceiverRefusedData) + return p.hasFiringAlert(alerts, alertrules.RuleNameGatewayReceiverRefusedData) } func (p *Prober) healthy(alerts []promv1.Alert, pipelineName string) bool { - return !(p.hasFiringAlertForPipeline(alerts, alertNameExporterDroppedData, pipelineName) || - p.hasFiringAlertForPipeline(alerts, alertNameExporterQueueAlmostFull, pipelineName) || - p.hasFiringAlertForPipeline(alerts, alertNameExporterEnqueueFailed, pipelineName) || - p.hasFiringAlert(alerts, alertNameReceiverRefusedData)) + return !(p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName) || + p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName) || + p.hasFiringAlertForPipeline(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName) || + p.hasFiringAlert(alerts, alertrules.RuleNameGatewayReceiverRefusedData)) } func (p *Prober) retrieveAlerts(ctx context.Context) ([]promv1.Alert, error) { @@ -146,11 +135,12 @@ func (p *Prober) hasFiringAlertForPipeline(alerts []promv1.Alert, alertName, pip func (p *Prober) matchesAlertName(alert promv1.Alert, alertName string) bool { v, ok := alert.Labels[model.AlertNameLabel] - return ok && string(v) == p.nameDecorator(alertName) + expectedFullName := alertrules.RuleNamePrefix(p.pipelineType) + alertName + return ok && string(v) == expectedFullName } func matchesPipeline(alert promv1.Alert, pipelineName string) bool { - labelValue, ok := alert.Labels[model.LabelName("exporter")] + labelValue, ok := alert.Labels[model.LabelName(alertrules.LabelExporter)] if !ok { return false } diff --git a/internal/selfmonitor/flowhealth/prober_test.go b/internal/selfmonitor/prober/prober_test.go similarity index 97% rename from internal/selfmonitor/flowhealth/prober_test.go rename to internal/selfmonitor/prober/prober_test.go index 202d3eee4..71d30dfc2 100644 --- a/internal/selfmonitor/flowhealth/prober_test.go +++ b/internal/selfmonitor/prober/prober_test.go @@ -1,4 +1,4 @@ -package flowhealth +package prober import ( "context" @@ -10,7 +10,8 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth/mocks" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober/mocks" ) func TestProber(t *testing.T) { @@ -266,8 +267,8 @@ func TestProber(t *testing.T) { } sut := Prober{ - getter: alertGetterMock, - nameDecorator: traceRuleNameDecorator, + getter: alertGetterMock, + pipelineType: alertrules.TracePipeline, } result, err := sut.Probe(context.Background(), tc.pipelineName) diff --git a/internal/selfmonitor/webhook/handler.go b/internal/selfmonitor/webhook/handler.go index bb9d881a5..789f21148 100644 --- a/internal/selfmonitor/webhook/handler.go +++ b/internal/selfmonitor/webhook/handler.go @@ -1,36 +1,48 @@ package webhook import ( + "context" + "encoding/json" "io" "net/http" + "strings" "github.com/go-logr/logr" + "github.com/prometheus/common/model" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/log" + + telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" + "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/otlpexporter" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" ) type Handler struct { - subscribers []chan<- event.GenericEvent + c client.Reader + subscribers map[alertrules.PipelineType]chan<- event.GenericEvent logger logr.Logger } type Option = func(*Handler) -func WithLogger(logger logr.Logger) Option { +func WithSubscriber(subscriber chan<- event.GenericEvent, pipelineType alertrules.PipelineType) Option { return func(h *Handler) { - h.logger = logger + h.subscribers[pipelineType] = subscriber } } -func WithSubscriber(subscriber chan<- event.GenericEvent) Option { +func WithLogger(logger logr.Logger) Option { return func(h *Handler) { - h.subscribers = append(h.subscribers, subscriber) + h.logger = logger } } -func NewHandler(opts ...Option) *Handler { +func NewHandler(c client.Reader, opts ...Option) *Handler { h := &Handler{ - logger: logr.New(logf.NullLogSink{}), + c: c, + logger: logr.New(logf.NullLogSink{}), + subscribers: make(map[alertrules.PipelineType]chan<- event.GenericEvent), } for _, opt := range opts { @@ -40,6 +52,10 @@ func NewHandler(opts ...Option) *Handler { return h } +type Alert struct { + Labels map[string]string `json:"labels"` +} + func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { h.logger.Info("Invalid method", "method", r.Method) @@ -47,21 +63,108 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - req, err := io.ReadAll(r.Body) + alertsYAML, err := io.ReadAll(r.Body) if err != nil { h.logger.Error(err, "Failed to read request body") w.WriteHeader(http.StatusInternalServerError) return } + var alerts []Alert + if unmarshallErr := json.Unmarshal(alertsYAML, &alerts); unmarshallErr != nil { + h.logger.Error(err, "Failed to unmarshal request body") + w.WriteHeader(http.StatusBadRequest) + return + } + defer r.Body.Close() + metricPipelineEvents := h.toMetricPipelineReconcileEvents(r.Context(), alerts) + tracePipelineEvents := h.toTracePipelineReconcileEvents(r.Context(), alerts) h.logger.V(1).Info("Webhook called. Notifying the subscribers.", - "request", string(req)) + "request", alerts, + "metricPipelines", retrieveNames(metricPipelineEvents), + "tracePipelines", retrieveNames(tracePipelineEvents), + ) - for _, sub := range h.subscribers { - sub <- event.GenericEvent{} + for _, ev := range metricPipelineEvents { + h.subscribers[alertrules.MetricPipeline] <- ev + } + + for _, ev := range tracePipelineEvents { + h.subscribers[alertrules.TracePipeline] <- ev } w.WriteHeader(http.StatusOK) } + +func (h *Handler) toMetricPipelineReconcileEvents(ctx context.Context, alerts []Alert) []event.GenericEvent { + var events []event.GenericEvent + var metricPipelines telemetryv1alpha1.MetricPipelineList + if err := h.c.List(ctx, &metricPipelines); err != nil { + return events + } + + for i := range metricPipelines.Items { + if shouldReconcile(&metricPipelines.Items[i], alertrules.MetricPipeline, alerts) { + events = append(events, event.GenericEvent{Object: &metricPipelines.Items[i]}) + } + } + + return events +} + +func (h *Handler) toTracePipelineReconcileEvents(ctx context.Context, alerts []Alert) []event.GenericEvent { + var events []event.GenericEvent + var tracePipelines telemetryv1alpha1.TracePipelineList + if err := h.c.List(ctx, &tracePipelines); err != nil { + return events + } + + for i := range tracePipelines.Items { + if shouldReconcile(&tracePipelines.Items[i], alertrules.TracePipeline, alerts) { + events = append(events, event.GenericEvent{Object: &tracePipelines.Items[i]}) + } + } + + return events +} + +func shouldReconcile(pipeline client.Object, pipelineType alertrules.PipelineType, alerts []Alert) bool { + for _, alert := range alerts { + expectedPrefix := alertrules.RuleNamePrefix(pipelineType) + if !strings.HasPrefix(alert.Labels[model.AlertNameLabel], expectedPrefix) { + continue + } + + if matchesAllPipelines(alert.Labels) || matchesPipeline(alert.Labels, pipeline.GetName()) { + return true + } + } + + return false +} + +func matchesAllPipelines(labels map[string]string) bool { + if _, ok := labels[alertrules.LabelExporter]; !ok { + return true + } + return false +} + +func matchesPipeline(labels map[string]string, pipelineName string) bool { + exportedID, ok := labels[alertrules.LabelExporter] + if !ok { + return false + } + + return otlpexporter.ExporterID(telemetryv1alpha1.OtlpProtocolHTTP, pipelineName) == exportedID || otlpexporter.ExporterID(telemetryv1alpha1.OtlpProtocolGRPC, pipelineName) == exportedID +} + +func retrieveNames(events []event.GenericEvent) []string { + var names []string + for _, ev := range events { + names = append(names, ev.Object.GetName()) + } + return names +} diff --git a/internal/selfmonitor/webhook/handler_test.go b/internal/selfmonitor/webhook/handler_test.go index f25fe61c7..693bd5d43 100644 --- a/internal/selfmonitor/webhook/handler_test.go +++ b/internal/selfmonitor/webhook/handler_test.go @@ -10,8 +10,17 @@ import ( "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/log" + + telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" + "github.com/kyma-project/telemetry-manager/internal/testutils" ) type errReader struct{} @@ -22,17 +31,73 @@ func (errReader) Read(p []byte) (n int, err error) { func TestHandler(t *testing.T) { tests := []struct { - name string - requestMethod string - requestBody io.Reader - expectedStatus int - expectEvent bool + name string + requestMethod string + requestBody io.Reader + resources []client.Object + expectedStatus int + metricPipelinesToReconcile []string + tracePipelinesToReconcile []string }{ { - name: "valid", - requestMethod: http.MethodPost, + name: "alert matches metric pipeline with same name", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"MetricGatewayExporterDroppedData","exporter":"otlp/cls"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewMetricPipelineBuilder().WithName("cls").Build()), + }, + expectedStatus: http.StatusOK, + metricPipelinesToReconcile: []string{"cls"}, + }, + { + name: "alert matches trace pipeline with same name", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"TraceGatewayExporterDroppedData","exporter":"otlp/cls"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewTracePipelineBuilder().WithName("cls").Build()), + }, + expectedStatus: http.StatusOK, + tracePipelinesToReconcile: []string{"cls"}, + }, + { + name: "alert does not match pipeline with other name", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"MetricGatewayExporterDroppedData","exporter":"otlp/dynatrace"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewTracePipelineBuilder().WithName("cls").Build()), + }, expectedStatus: http.StatusOK, - expectEvent: true, + }, + { + name: "alert does not match pipeline of other type", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"MetricGatewayExporterDroppedData","exporter":"otlp/cls"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewTracePipelineBuilder().WithName("cls").Build()), + }, + expectedStatus: http.StatusOK, + }, + { + name: "alert matches all metric pipelines", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"MetricGatewayReceiverRefusedData"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewMetricPipelineBuilder().WithName("cls").Build()), + ptr.To(testutils.NewMetricPipelineBuilder().WithName("dynatrace").Build()), + }, + expectedStatus: http.StatusOK, + metricPipelinesToReconcile: []string{"cls", "dynatrace"}, + }, + { + name: "alert matches all trace pipelines", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`[{"labels":{"alertname":"TraceGatewayReceiverRefusedData"}}]`)), + resources: []client.Object{ + ptr.To(testutils.NewTracePipelineBuilder().WithName("cls").Build()), + ptr.To(testutils.NewTracePipelineBuilder().WithName("dynatrace").Build()), + }, + expectedStatus: http.StatusOK, + tracePipelinesToReconcile: []string{"cls", "dynatrace"}, }, { name: "invalid method", @@ -45,17 +110,29 @@ func TestHandler(t *testing.T) { requestBody: errReader{}, expectedStatus: http.StatusInternalServerError, }, + { + name: "failed to unmarshal request body", + requestMethod: http.MethodPost, + requestBody: bytes.NewBuffer([]byte(`{"labels":{"alertname":"TraceGatewayReceiverRefusedData"}}`)), + expectedStatus: http.StatusBadRequest, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ch := make(chan event.GenericEvent, 1) + metricPipelineEvents := make(chan event.GenericEvent, 1024) + tracePipelineEvents := make(chan event.GenericEvent, 1024) noopLogger := logr.New(logf.NullLogSink{}) - handler := NewHandler(WithSubscriber(ch), WithLogger(noopLogger)) - if tc.requestBody == nil { - tc.requestBody = bytes.NewBuffer([]byte(`{"key":"value"}`)) - } + + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = telemetryv1alpha1.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tc.resources...).Build() + handler := NewHandler(fakeClient, + WithSubscriber(metricPipelineEvents, alertrules.MetricPipeline), + WithSubscriber(tracePipelineEvents, alertrules.TracePipeline), + WithLogger(noopLogger)) req, err := http.NewRequest(tc.requestMethod, "/", tc.requestBody) require.NoError(t, err) @@ -64,9 +141,31 @@ func TestHandler(t *testing.T) { handler.ServeHTTP(rr, req) require.Equal(t, tc.expectedStatus, rr.Code) - if tc.expectEvent { - require.NotEmpty(t, ch) + if tc.metricPipelinesToReconcile != nil { + require.NotEmpty(t, metricPipelineEvents) + require.ElementsMatch(t, tc.metricPipelinesToReconcile, readAllNamesFromChannel(metricPipelineEvents)) + } else { + require.Empty(t, metricPipelineEvents) + } + + if tc.tracePipelinesToReconcile != nil { + require.NotEmpty(t, tracePipelineEvents) + require.ElementsMatch(t, tc.tracePipelinesToReconcile, readAllNamesFromChannel(tracePipelineEvents)) + } else { + require.Empty(t, tracePipelineEvents) } }) } } + +func readAllNamesFromChannel(ch <-chan event.GenericEvent) []string { + var names []string + for { + select { + case event := <-ch: + names = append(names, event.Object.GetName()) + default: + return names + } + } +} diff --git a/main.go b/main.go index a7a1387bc..ef77ded37 100644 --- a/main.go +++ b/main.go @@ -64,7 +64,8 @@ import ( "github.com/kyma-project/telemetry-manager/internal/resources/fluentbit" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" "github.com/kyma-project/telemetry-manager/internal/resources/selfmonitor" - "github.com/kyma-project/telemetry-manager/internal/selfmonitor/flowhealth" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules" + "github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober" selfmonitorwebhook "github.com/kyma-project/telemetry-manager/internal/selfmonitor/webhook" "github.com/kyma-project/telemetry-manager/internal/webhookcert" "github.com/kyma-project/telemetry-manager/webhook/dryrun" @@ -384,8 +385,9 @@ func main() { if enableWebhook && enableSelfMonitor { mgr.GetWebhookServer().Register("/api/v2/alerts", selfmonitorwebhook.NewHandler( - selfmonitorwebhook.WithSubscriber(tracingControllerReconcileTriggerChan), - selfmonitorwebhook.WithSubscriber(metricsControllerReconcileTriggerChan), + mgr.GetClient(), + selfmonitorwebhook.WithSubscriber(tracingControllerReconcileTriggerChan, alertrules.TracePipeline), + selfmonitorwebhook.WithSubscriber(metricsControllerReconcileTriggerChan, alertrules.MetricPipeline), selfmonitorwebhook.WithLogger(ctrl.Log.WithName("self-monitor-webhook")))) } @@ -424,8 +426,8 @@ func enableLoggingController(mgr manager.Manager) { func enableTracingController(mgr manager.Manager, reconcileTriggerChan <-chan event.GenericEvent) { setupLog.Info("Starting with tracing controller") var err error - var flowHealthProber *flowhealth.Prober - if flowHealthProber, err = flowhealth.NewProber(flowhealth.FlowTypeTraces, + var flowHealthProber *prober.Prober + if flowHealthProber, err = prober.NewProber(alertrules.TracePipeline, types.NamespacedName{Name: selfMonitorName, Namespace: telemetryNamespace}); err != nil { setupLog.Error(err, "Failed to create flow health prober") os.Exit(1) @@ -440,8 +442,8 @@ func enableTracingController(mgr manager.Manager, reconcileTriggerChan <-chan ev func enableMetricsController(mgr manager.Manager, reconcileTriggerChan <-chan event.GenericEvent) { setupLog.Info("Starting with metrics controller") var err error - var flowHealthProber *flowhealth.Prober - if flowHealthProber, err = flowhealth.NewProber(flowhealth.FlowTypeMetrics, + var flowHealthProber *prober.Prober + if flowHealthProber, err = prober.NewProber(alertrules.MetricPipeline, types.NamespacedName{Name: selfMonitorName, Namespace: telemetryNamespace}); err != nil { setupLog.Error(err, "Failed to create flow health prober") os.Exit(1) @@ -549,7 +551,7 @@ func createLogParserValidator(client client.Client) *logparserwebhook.Validating admission.NewDecoder(scheme)) } -func createTracePipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.TracePipelineController { +func createTracePipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *prober.Prober) *telemetrycontrollers.TracePipelineController { config := tracepipeline.Config{ Gateway: otelcollector.GatewayConfig{ Config: otelcollector.Config{ @@ -589,7 +591,7 @@ func createTracePipelineController(client client.Client, reconcileTriggerChan <- ) } -func createMetricPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *flowhealth.Prober) *telemetrycontrollers.MetricPipelineController { +func createMetricPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, flowHealthProber *prober.Prober) *telemetrycontrollers.MetricPipelineController { config := metricpipeline.Config{ Agent: otelcollector.AgentConfig{ Config: otelcollector.Config{