diff --git a/controllers/telemetry/metricpipeline_controller.go b/controllers/telemetry/metricpipeline_controller.go index 8b0ae9397..5ab83f5d3 100644 --- a/controllers/telemetry/metricpipeline_controller.go +++ b/controllers/telemetry/metricpipeline_controller.go @@ -19,7 +19,10 @@ package telemetry import ( "context" "fmt" - + operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1" + telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" + "github.com/kyma-project/telemetry-manager/internal/predicate" + "github.com/kyma-project/telemetry-manager/internal/reconciler/metricpipeline" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -29,27 +32,26 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1" - telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" - "github.com/kyma-project/telemetry-manager/internal/predicate" - "github.com/kyma-project/telemetry-manager/internal/reconciler/metricpipeline" + "sigs.k8s.io/controller-runtime/pkg/source" ) // MetricPipelineReconciler reconciles a MetricPipeline object type MetricPipelineReconciler struct { client.Client - reconciler *metricpipeline.Reconciler + reconciler *metricpipeline.Reconciler + reconcileTriggerChan chan event.GenericEvent } -func NewMetricPipelineReconciler(client client.Client, reconciler *metricpipeline.Reconciler) *MetricPipelineReconciler { +func NewMetricPipelineReconciler(client client.Client, reconciler *metricpipeline.Reconciler, reconcileTriggerChan chan event.GenericEvent) *MetricPipelineReconciler { return &MetricPipelineReconciler{ - Client: client, - reconciler: reconciler, + Client: client, + reconciler: reconciler, + reconcileTriggerChan: reconcileTriggerChan, } } @@ -92,9 +94,21 @@ func (r *MetricPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { &operatorv1alpha1.Telemetry{}, handler.EnqueueRequestsFromMapFunc(r.mapTelemetryChanges), builder.WithPredicates(predicate.CreateOrUpdateOrDelete()), + ).WatchesRawSource( + &source.Channel{Source: r.reconcileTriggerChan}, + handler.EnqueueRequestsFromMapFunc(r.mapPrometheusAlertEvent), ).Complete(r) } +func (r *MetricPipelineReconciler) mapPrometheusAlertEvent(ctx context.Context, _ client.Object) []reconcile.Request { + logf.FromContext(ctx).Info("Handling Prometheus alert event") + requests, err := r.createRequestsForAllPipelines(ctx) + if err != nil { + logf.FromContext(ctx).Error(err, "Unable to create reconcile requests") + } + return requests +} + func (r *MetricPipelineReconciler) mapCRDChanges(ctx context.Context, object client.Object) []reconcile.Request { _, ok := object.(*apiextensionsv1.CustomResourceDefinition) if !ok { diff --git a/internal/conditions/conditions.go b/internal/conditions/conditions.go index 6255ebc77..a3aab168c 100644 --- a/internal/conditions/conditions.go +++ b/internal/conditions/conditions.go @@ -1,9 +1,16 @@ package conditions +import ( + "fmt" + "github.com/kyma-project/telemetry-manager/internal/prometheus" + "strings" +) + const ( TypeMetricGatewayHealthy = "GatewayHealthy" TypeMetricAgentHealthy = "AgentHealthy" TypeConfigurationGenerated = "ConfigurationGenerated" + TypeMetricFlowHealthy = "MetricFlowHealthy" ) const ( @@ -26,6 +33,11 @@ const ( ReasonTraceGatewayDeploymentNotReady = "TraceGatewayDeploymentNotReady" ReasonTraceGatewayDeploymentReady = "TraceGatewayDeploymentReady" + + ReasonMetricFlowHealthy = "MetricFlowHealthy" + ReasonExporterDroppedMetrics = "PipelineDropsMetrics" + + ReasonUnknown = "ReasonUnknown" ) var message = map[string]string{ @@ -45,6 +57,14 @@ var message = map[string]string{ ReasonTraceGatewayDeploymentNotReady: "Trace gateway Deployment is not ready", ReasonTraceGatewayDeploymentReady: "Trace gateway Deployment is ready", + + ReasonExporterDroppedMetrics: "Pipeline pipelineName is dropping Metrics", + ReasonUnknown: "Cannot determine the pipeline state", +} + +var alertMap = map[string]string{ + "ExporterDroppedMetrics": ReasonExporterDroppedMetrics, + "Unknown": ReasonUnknown, } // CommonMessageFor returns a human-readable message corresponding to a given reason. @@ -55,3 +75,23 @@ func CommonMessageFor(reason string) string { } return "" } + +func FetchReasonFromAlert(alert prometheus.Alerts) string { + if reason, found := alertMap[alert.Name]; found { + //fmt.Printf("PipelineName: %v\n", alert.PipelineInfo) + return reason + } + return "" +} + +func MessageForAlerts(alert prometheus.Alerts) string { + reason := FetchReasonFromAlert(alert) + if reason == "" { + return "" + } + if reasonMsg, found := message[reason]; found { + fmt.Printf("PipelineName: %v\n", alert.PipelineInfo) + return strings.Replace(reasonMsg, "pipelineName", alert.PipelineInfo, 1) + } + return "" +} diff --git a/internal/prometheus/alerts.go b/internal/prometheus/alerts.go new file mode 100644 index 000000000..d18c1ba66 --- /dev/null +++ b/internal/prometheus/alerts.go @@ -0,0 +1,150 @@ +package prometheus + +import ( + "context" + "fmt" + "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "time" +) + +const prometheusAPIURL = "http://prometheus-server.default:80" + +//var criticalAlerts = []string{"ExporterDroppedMetrics", "ReceiverDroppedMetrics", "ExporterDroppedSpans", "ReceiverDroppedSpans", "ReceiverDroppedLogs"} + +type Alerts struct { + Name string + Severity string + PipelineInfo string +} + +func NewAlerts() Alerts { + return Alerts{ + Name: "", + Severity: "", + PipelineInfo: "", + } +} + +func SetUnknownAlert() Alerts { + return Alerts{ + Name: "Unknown", + Severity: "Unknown", + PipelineInfo: "Unknown", + } +} + +func QueryAlerts(ctx context.Context, currentAlert Alerts) (error, Alerts) { + client, err := api.NewClient(api.Config{ + Address: prometheusAPIURL, + }) + if err != nil { + return fmt.Errorf("failed to create Prometheus client: %w", err), Alerts{} + } + + v1api := promv1.NewAPI(client) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + start := time.Now() + alerts, err := v1api.Alerts(ctx) + + if err != nil { + return fmt.Errorf("failed to query Prometheus alerts: %w", err), Alerts{} + } + + logf.FromContext(ctx).Info("Prometheus alert query succeeded!", + "elapsed_ms", time.Since(start).Milliseconds(), + "alerts", alerts) + if len(alerts.Alerts) == 0 { + return nil, Alerts{} + } + + alert := fetchAlert(alerts, currentAlert) + return nil, alert +} + +func fetchAlert(alerts promv1.AlertsResult, currentAlert Alerts) Alerts { + if len(alerts.Alerts) == 0 { + return Alerts{} + } + firingAlerts := fetchFiringAlerts(alerts.Alerts) + // Verify if current Alert is still firing and if critical then dont change the state + if currentCriticalAlertIsStillFiring(currentAlert, firingAlerts) { + return currentAlert + } + //if currentAlert.Name != "" && firingAlertsContainsAlert(currentAlert.Name, firingAlerts) { + // if slices.Contains(criticalAlerts, currentAlert.Name) { + // return currentAlert + // } + //} + alert := fetchCriticalAlerts(firingAlerts) + if alert.Name != "" { + return alert + } + return fetchNonCriticalAlerts(firingAlerts) +} + +func currentCriticalAlertIsStillFiring(currentAlert Alerts, firingAlerts []promv1.Alert) bool { + if currentAlert.Name == "" { + return false + } + if currentAlert.Severity == "critical" && firingAlertsContainsAlert(currentAlert.Name, firingAlerts) { + return true + } + return false +} + +func firingAlertsContainsAlert(alertName string, alerts []promv1.Alert) bool { + for _, alert := range alerts { + if string(alert.Labels["alertname"]) == alertName { + return true + } + } + return false +} +func fetchFiringAlerts(alerts []promv1.Alert) []promv1.Alert { + var firingAlerts []promv1.Alert + for _, alert := range alerts { + if alert.State == promv1.AlertStateFiring { + firingAlerts = append(firingAlerts, alert) + } + } + return firingAlerts +} +func fetchCriticalAlerts(alerts []promv1.Alert) Alerts { + for _, alert := range alerts { + if string(alert.Labels["severity"]) == "critical" { + return Alerts{ + Name: string(alert.Labels["alertname"]), + Severity: string(alert.Labels["severity"]), + PipelineInfo: FetchPipelineInfo(alert), + } + } + } + return Alerts{} +} + +func fetchNonCriticalAlerts(alerts []promv1.Alert) Alerts { + for _, alert := range alerts { + return Alerts{ + Name: string(alert.Labels["alertname"]), + Severity: string(alert.Labels["severity"]), + PipelineInfo: FetchPipelineInfo(alert), + } + } + return Alerts{} +} + +func FetchPipelineInfo(alert promv1.Alert) string { + if string(alert.Labels["alertname"]) == "ExporterDroppedMetrics" || string(alert.Labels["alertname"]) == "ExporterDroppedSpans" || string(alert.Labels["alertname"]) == "ExporterDroppedLogs" { + fmt.Printf("Returning Exporter: %v\n", string(alert.Labels["exporter"])) + return string(alert.Labels["exporter"]) + } + if string(alert.Labels["alertname"]) == "ReceiverDroppedMetric" || string(alert.Labels["alertname"]) == "ReceiverDroppedSpans" || string(alert.Labels["alertname"]) == "ReceiverDroppedLogs" { + fmt.Printf("Returning Receiver: %v\n", string(alert.Labels["receiver"])) + return string(alert.Labels["receiver"]) + } + return "" +} diff --git a/internal/reconciler/metricpipeline/reconciler.go b/internal/reconciler/metricpipeline/reconciler.go index fffb2fe8e..f97e452f6 100644 --- a/internal/reconciler/metricpipeline/reconciler.go +++ b/internal/reconciler/metricpipeline/reconciler.go @@ -3,13 +3,6 @@ package metricpipeline import ( "context" "fmt" - - "gopkg.in/yaml.v3" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - logf "sigs.k8s.io/controller-runtime/pkg/log" - operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1" telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" "github.com/kyma-project/telemetry-manager/internal/istiostatus" @@ -18,8 +11,14 @@ import ( "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/metric/gateway" "github.com/kyma-project/telemetry-manager/internal/otelcollector/ports" "github.com/kyma-project/telemetry-manager/internal/overrides" + "github.com/kyma-project/telemetry-manager/internal/prometheus" "github.com/kyma-project/telemetry-manager/internal/resources/otelcollector" "github.com/kyma-project/telemetry-manager/internal/secretref" + "gopkg.in/yaml.v3" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) const defaultReplicaCount int32 = 2 @@ -48,6 +47,7 @@ type Reconciler struct { agentProber DaemonSetProber overridesHandler *overrides.Handler istioStatusChecker istiostatus.Checker + currentAlert prometheus.Alerts } func NewReconciler(client client.Client, config Config, gatewayProber DeploymentProber, agentProber DaemonSetProber, overridesHandler *overrides.Handler) *Reconciler { @@ -58,6 +58,7 @@ func NewReconciler(client client.Client, config Config, gatewayProber Deployment agentProber: agentProber, overridesHandler: overridesHandler, istioStatusChecker: istiostatus.NewChecker(client), + currentAlert: prometheus.NewAlerts(), } } @@ -86,8 +87,15 @@ func (r *Reconciler) doReconcile(ctx context.Context, pipeline *telemetryv1alpha var err error lockAcquired := true + err, alert := prometheus.QueryAlerts(ctx, r.currentAlert) + if err != nil { + logf.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to query Prometheus: %w", err)) + alert = prometheus.SetUnknownAlert() + } + r.currentAlert = alert + defer func() { - if statusErr := r.updateStatus(ctx, pipeline.Name, lockAcquired); statusErr != nil { + if statusErr := r.updateStatus(ctx, pipeline.Name, lockAcquired, r.currentAlert); statusErr != nil { if err != nil { err = fmt.Errorf("failed while updating status: %v: %v", statusErr, err) } else { diff --git a/internal/reconciler/metricpipeline/status.go b/internal/reconciler/metricpipeline/status.go index fec4e4174..6f0380124 100644 --- a/internal/reconciler/metricpipeline/status.go +++ b/internal/reconciler/metricpipeline/status.go @@ -3,7 +3,7 @@ package metricpipeline import ( "context" "fmt" - + "github.com/kyma-project/telemetry-manager/internal/prometheus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,7 +15,7 @@ import ( "github.com/kyma-project/telemetry-manager/internal/secretref" ) -func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, withinPipelineCountLimit bool) error { +func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, withinPipelineCountLimit bool, alert prometheus.Alerts) error { var pipeline telemetryv1alpha1.MetricPipeline if err := r.Get(ctx, types.NamespacedName{Name: pipelineName}, &pipeline); err != nil { if apierrors.IsNotFound(err) { @@ -34,10 +34,12 @@ func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, with r.setAgentHealthyCondition(ctx, &pipeline) r.setGatewayHealthyCondition(ctx, &pipeline) r.setGatewayConfigGeneratedCondition(ctx, &pipeline, withinPipelineCountLimit) + r.setMetricFlowHealthCondition(&pipeline, alert) if err := r.Status().Update(ctx, &pipeline); err != nil { return fmt.Errorf("failed to update MetricPipeline status: %w", err) } + fmt.Printf("Fetching conditions after update: %v\n", &pipeline.Status.Conditions) return nil } @@ -100,6 +102,22 @@ func (r *Reconciler) setGatewayConfigGeneratedCondition(ctx context.Context, pip meta.SetStatusCondition(&pipeline.Status.Conditions, newCondition(conditions.TypeConfigurationGenerated, reason, status, pipeline.Generation)) } +func (r *Reconciler) setMetricFlowHealthCondition(pipeline *telemetryv1alpha1.MetricPipeline, alert prometheus.Alerts) { + fmt.Printf("Alertname is: %v\n", alert.Name) + status := metav1.ConditionTrue + reason := conditions.ReasonMetricFlowHealthy + + if alert.Name != "" { + status = metav1.ConditionFalse + reason = conditions.FetchReasonFromAlert(alert) + } + msg := conditions.MessageForAlerts(alert) + + fmt.Printf("Status is: %v, Reason: %v, msg: %v\n", status, reason, msg) + + meta.SetStatusCondition(&pipeline.Status.Conditions, newConditionForAlerts(conditions.TypeMetricFlowHealthy, reason, msg, status, pipeline.Generation)) +} + func newCondition(condType, reason string, status metav1.ConditionStatus, generation int64) metav1.Condition { return metav1.Condition{ Type: condType, @@ -109,3 +127,13 @@ func newCondition(condType, reason string, status metav1.ConditionStatus, genera ObservedGeneration: generation, } } + +func newConditionForAlerts(condType, reason, msg string, status metav1.ConditionStatus, generation int64) metav1.Condition { + return metav1.Condition{ + Type: condType, + Status: status, + Reason: reason, + Message: msg, + ObservedGeneration: generation, + } +} diff --git a/main.go b/main.go index 09a7f7482..279194205 100644 --- a/main.go +++ b/main.go @@ -20,8 +20,11 @@ import ( "context" "errors" "flag" + "io" + networkingv1 "k8s.io/api/networking/v1" "net/http" "os" + "sigs.k8s.io/controller-runtime/pkg/event" "strings" "sync" "time" @@ -32,7 +35,6 @@ import ( istiosecurityclientv1beta "istio.io/client-go/pkg/apis/security/v1beta1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - networkingv1 "k8s.io/api/networking/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/fields" @@ -123,6 +125,7 @@ var ( metricGatewayDynamicCPURequest string metricGatewayMemoryRequest string metricGatewayDynamicMemoryRequest string + reconcileTriggerChan chan event.GenericEvent enableWebhook bool mutex sync.Mutex @@ -295,6 +298,38 @@ func main() { } }() + reconcileTriggerChan = make(chan event.GenericEvent, 1024) + go func() { + handler := func(w http.ResponseWriter, r *http.Request) { + _, readErr := io.ReadAll(r.Body) + if readErr != nil { + http.Error(w, "Error reading request body", http.StatusInternalServerError) + return + } + defer r.Body.Close() + //setupLog.Info("Http request Body", body) + + // TODO: add more context about which objects have to reconciled + reconcileTriggerChan <- event.GenericEvent{} + w.WriteHeader(http.StatusOK) + } + + mux := http.NewServeMux() + mux.HandleFunc("/api/v2/alerts", handler) + + server := &http.Server{ + Addr: ":9090", + ReadHeaderTimeout: 10 * time.Second, + Handler: mux, + } + + if serverErr := server.ListenAndServe(); serverErr != nil { + mutex.Lock() + setupLog.Error(serverErr, "Cannot start webhook server") + mutex.Unlock() + } + }() + syncPeriod := 1 * time.Minute mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, @@ -583,7 +618,8 @@ func createMetricPipelineReconciler(client client.Client) *telemetrycontrollers. return telemetrycontrollers.NewMetricPipelineReconciler( client, - metricpipeline.NewReconciler(client, config, &k8sutils.DeploymentProber{Client: client}, &k8sutils.DaemonSetProber{Client: client}, overridesHandler)) + metricpipeline.NewReconciler(client, config, &k8sutils.DeploymentProber{Client: client}, &k8sutils.DaemonSetProber{Client: client}, overridesHandler), + reconcileTriggerChan) } func createDryRunConfig() dryrun.Config {