Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Set metricpipeline status based on alerts #753

34 changes: 24 additions & 10 deletions controllers/telemetry/metricpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package telemetry
import (
"context"
"fmt"

operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1"
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/predicate"
"github.com/kyma-project/telemetry-manager/internal/reconciler/metricpipeline"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand All @@ -29,27 +32,26 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1"
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/predicate"
"github.com/kyma-project/telemetry-manager/internal/reconciler/metricpipeline"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// MetricPipelineReconciler reconciles a MetricPipeline object
type MetricPipelineReconciler struct {
client.Client

reconciler *metricpipeline.Reconciler
reconciler *metricpipeline.Reconciler
reconcileTriggerChan chan event.GenericEvent
}

func NewMetricPipelineReconciler(client client.Client, reconciler *metricpipeline.Reconciler) *MetricPipelineReconciler {
func NewMetricPipelineReconciler(client client.Client, reconciler *metricpipeline.Reconciler, reconcileTriggerChan chan event.GenericEvent) *MetricPipelineReconciler {
return &MetricPipelineReconciler{
Client: client,
reconciler: reconciler,
Client: client,
reconciler: reconciler,
reconcileTriggerChan: reconcileTriggerChan,
}
}

Expand Down Expand Up @@ -92,9 +94,21 @@ func (r *MetricPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
&operatorv1alpha1.Telemetry{},
handler.EnqueueRequestsFromMapFunc(r.mapTelemetryChanges),
builder.WithPredicates(predicate.CreateOrUpdateOrDelete()),
).WatchesRawSource(
&source.Channel{Source: r.reconcileTriggerChan},
handler.EnqueueRequestsFromMapFunc(r.mapPrometheusAlertEvent),
).Complete(r)
}

func (r *MetricPipelineReconciler) mapPrometheusAlertEvent(ctx context.Context, _ client.Object) []reconcile.Request {
logf.FromContext(ctx).Info("Handling Prometheus alert event")
requests, err := r.createRequestsForAllPipelines(ctx)
if err != nil {
logf.FromContext(ctx).Error(err, "Unable to create reconcile requests")
}
return requests
}

func (r *MetricPipelineReconciler) mapCRDChanges(ctx context.Context, object client.Object) []reconcile.Request {
_, ok := object.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
Expand Down
40 changes: 40 additions & 0 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package conditions

import (
"fmt"
"github.com/kyma-project/telemetry-manager/internal/prometheus"
"strings"
)

const (
TypeMetricGatewayHealthy = "GatewayHealthy"
TypeMetricAgentHealthy = "AgentHealthy"
TypeConfigurationGenerated = "ConfigurationGenerated"
TypeMetricFlowHealthy = "MetricFlowHealthy"
)

const (
Expand All @@ -26,6 +33,11 @@ const (

ReasonTraceGatewayDeploymentNotReady = "TraceGatewayDeploymentNotReady"
ReasonTraceGatewayDeploymentReady = "TraceGatewayDeploymentReady"

ReasonMetricFlowHealthy = "MetricFlowHealthy"
ReasonExporterDroppedMetrics = "PipelineDropsMetrics"

ReasonUnknown = "ReasonUnknown"
)

var message = map[string]string{
Expand All @@ -45,6 +57,14 @@ var message = map[string]string{

ReasonTraceGatewayDeploymentNotReady: "Trace gateway Deployment is not ready",
ReasonTraceGatewayDeploymentReady: "Trace gateway Deployment is ready",

ReasonExporterDroppedMetrics: "Pipeline pipelineName is dropping Metrics",
ReasonUnknown: "Cannot determine the pipeline state",
}

var alertMap = map[string]string{
"ExporterDroppedMetrics": ReasonExporterDroppedMetrics,
"Unknown": ReasonUnknown,
}

// CommonMessageFor returns a human-readable message corresponding to a given reason.
Expand All @@ -55,3 +75,23 @@ func CommonMessageFor(reason string) string {
}
return ""
}

func FetchReasonFromAlert(alert prometheus.Alerts) string {
if reason, found := alertMap[alert.Name]; found {
//fmt.Printf("PipelineName: %v\n", alert.PipelineInfo)
return reason
}
return ""
}

func MessageForAlerts(alert prometheus.Alerts) string {
reason := FetchReasonFromAlert(alert)
if reason == "" {
return ""
}
if reasonMsg, found := message[reason]; found {
fmt.Printf("PipelineName: %v\n", alert.PipelineInfo)
return strings.Replace(reasonMsg, "pipelineName", alert.PipelineInfo, 1)
}
return ""
}
150 changes: 150 additions & 0 deletions internal/prometheus/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package prometheus

import (
"context"
"fmt"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"time"
)

const prometheusAPIURL = "http://prometheus-server.default:80"

//var criticalAlerts = []string{"ExporterDroppedMetrics", "ReceiverDroppedMetrics", "ExporterDroppedSpans", "ReceiverDroppedSpans", "ReceiverDroppedLogs"}

type Alerts struct {
Name string
Severity string
PipelineInfo string
}

func NewAlerts() Alerts {
return Alerts{
Name: "",
Severity: "",
PipelineInfo: "",
}
}

func SetUnknownAlert() Alerts {
return Alerts{
Name: "Unknown",
Severity: "Unknown",
PipelineInfo: "Unknown",
}
}

func QueryAlerts(ctx context.Context, currentAlert Alerts) (error, Alerts) {
client, err := api.NewClient(api.Config{
Address: prometheusAPIURL,
})
if err != nil {
return fmt.Errorf("failed to create Prometheus client: %w", err), Alerts{}
}

v1api := promv1.NewAPI(client)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

start := time.Now()
alerts, err := v1api.Alerts(ctx)

if err != nil {
return fmt.Errorf("failed to query Prometheus alerts: %w", err), Alerts{}
}

logf.FromContext(ctx).Info("Prometheus alert query succeeded!",
"elapsed_ms", time.Since(start).Milliseconds(),
"alerts", alerts)
if len(alerts.Alerts) == 0 {
return nil, Alerts{}
}

alert := fetchAlert(alerts, currentAlert)
return nil, alert
}

func fetchAlert(alerts promv1.AlertsResult, currentAlert Alerts) Alerts {
if len(alerts.Alerts) == 0 {
return Alerts{}
}
firingAlerts := fetchFiringAlerts(alerts.Alerts)
// Verify if current Alert is still firing and if critical then dont change the state
if currentCriticalAlertIsStillFiring(currentAlert, firingAlerts) {
return currentAlert
}
//if currentAlert.Name != "" && firingAlertsContainsAlert(currentAlert.Name, firingAlerts) {
// if slices.Contains(criticalAlerts, currentAlert.Name) {
// return currentAlert
// }
//}
alert := fetchCriticalAlerts(firingAlerts)
if alert.Name != "" {
return alert
}
return fetchNonCriticalAlerts(firingAlerts)
}

func currentCriticalAlertIsStillFiring(currentAlert Alerts, firingAlerts []promv1.Alert) bool {
if currentAlert.Name == "" {
return false
}
if currentAlert.Severity == "critical" && firingAlertsContainsAlert(currentAlert.Name, firingAlerts) {
return true
}
return false
}

func firingAlertsContainsAlert(alertName string, alerts []promv1.Alert) bool {
for _, alert := range alerts {
if string(alert.Labels["alertname"]) == alertName {
return true
}
}
return false
}
func fetchFiringAlerts(alerts []promv1.Alert) []promv1.Alert {
var firingAlerts []promv1.Alert
for _, alert := range alerts {
if alert.State == promv1.AlertStateFiring {
firingAlerts = append(firingAlerts, alert)
}
}
return firingAlerts
}
func fetchCriticalAlerts(alerts []promv1.Alert) Alerts {
for _, alert := range alerts {
if string(alert.Labels["severity"]) == "critical" {
return Alerts{
Name: string(alert.Labels["alertname"]),
Severity: string(alert.Labels["severity"]),
PipelineInfo: FetchPipelineInfo(alert),
}
}
}
return Alerts{}
}

func fetchNonCriticalAlerts(alerts []promv1.Alert) Alerts {
for _, alert := range alerts {
return Alerts{
Name: string(alert.Labels["alertname"]),
Severity: string(alert.Labels["severity"]),
PipelineInfo: FetchPipelineInfo(alert),
}
}
return Alerts{}
}

func FetchPipelineInfo(alert promv1.Alert) string {
if string(alert.Labels["alertname"]) == "ExporterDroppedMetrics" || string(alert.Labels["alertname"]) == "ExporterDroppedSpans" || string(alert.Labels["alertname"]) == "ExporterDroppedLogs" {
fmt.Printf("Returning Exporter: %v\n", string(alert.Labels["exporter"]))
return string(alert.Labels["exporter"])
}
if string(alert.Labels["alertname"]) == "ReceiverDroppedMetric" || string(alert.Labels["alertname"]) == "ReceiverDroppedSpans" || string(alert.Labels["alertname"]) == "ReceiverDroppedLogs" {
fmt.Printf("Returning Receiver: %v\n", string(alert.Labels["receiver"]))
return string(alert.Labels["receiver"])
}
return ""
}
24 changes: 16 additions & 8 deletions internal/reconciler/metricpipeline/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ package metricpipeline
import (
"context"
"fmt"

"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

operatorv1alpha1 "github.com/kyma-project/telemetry-manager/apis/operator/v1alpha1"
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/istiostatus"
Expand All @@ -18,8 +11,14 @@ import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/metric/gateway"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/ports"
"github.com/kyma-project/telemetry-manager/internal/overrides"
"github.com/kyma-project/telemetry-manager/internal/prometheus"
"github.com/kyma-project/telemetry-manager/internal/resources/otelcollector"
"github.com/kyma-project/telemetry-manager/internal/secretref"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const defaultReplicaCount int32 = 2
Expand Down Expand Up @@ -48,6 +47,7 @@ type Reconciler struct {
agentProber DaemonSetProber
overridesHandler *overrides.Handler
istioStatusChecker istiostatus.Checker
currentAlert prometheus.Alerts
}

func NewReconciler(client client.Client, config Config, gatewayProber DeploymentProber, agentProber DaemonSetProber, overridesHandler *overrides.Handler) *Reconciler {
Expand All @@ -58,6 +58,7 @@ func NewReconciler(client client.Client, config Config, gatewayProber Deployment
agentProber: agentProber,
overridesHandler: overridesHandler,
istioStatusChecker: istiostatus.NewChecker(client),
currentAlert: prometheus.NewAlerts(),
}
}

Expand Down Expand Up @@ -86,8 +87,15 @@ func (r *Reconciler) doReconcile(ctx context.Context, pipeline *telemetryv1alpha
var err error
lockAcquired := true

err, alert := prometheus.QueryAlerts(ctx, r.currentAlert)
if err != nil {
logf.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to query Prometheus: %w", err))
alert = prometheus.SetUnknownAlert()
}
r.currentAlert = alert

defer func() {
if statusErr := r.updateStatus(ctx, pipeline.Name, lockAcquired); statusErr != nil {
if statusErr := r.updateStatus(ctx, pipeline.Name, lockAcquired, r.currentAlert); statusErr != nil {
if err != nil {
err = fmt.Errorf("failed while updating status: %v: %v", statusErr, err)
} else {
Expand Down
Loading
Loading