From 67fbc5e2389c7ccd0f4c30d54479ea12939d48cb Mon Sep 17 00:00:00 2001 From: mithunbelur Date: Fri, 8 Nov 2024 10:02:36 +0000 Subject: [PATCH] k8sevents and debug exporter chagnes for alerts --- exporter/opsrampdebugexporter/exporter.go | 43 ++++++++++++++++--- receiver/k8seventsreceiver/config.go | 15 ++++++- .../k8seventsreceiver/k8s_event_to_logdata.go | 13 +++++- receiver/k8seventsreceiver/receiver.go | 36 ++++++++-------- 4 files changed, 82 insertions(+), 25 deletions(-) diff --git a/exporter/opsrampdebugexporter/exporter.go b/exporter/opsrampdebugexporter/exporter.go index 54dc7c47ae29..3440107b7414 100644 --- a/exporter/opsrampdebugexporter/exporter.go +++ b/exporter/opsrampdebugexporter/exporter.go @@ -21,7 +21,8 @@ import ( ) var ( - LogsOpsRampChannel = make(chan plog.Logs, 1000) + LogsOpsRampChannel = make(chan plog.Logs, 1000) + EventsOpsRampChannel = make(chan plog.Logs, 100) ) type debugExporter struct { @@ -92,11 +93,41 @@ func (s *debugExporter) pushLogs(_ context.Context, ld plog.Logs) error { zap.Int("resource logs", ld.ResourceLogs().Len()), zap.Int("log records", ld.LogRecordCount())) - select { - case LogsOpsRampChannel <- ld: - s.logger.Info("#######LogsExporter: Successfully sent to channel") - default: - s.logger.Info("#######LogsExporter: failed sent to channel") + eventsSlice := plog.NewResourceLogsSlice() + logsSlice := plog.NewResourceLogsSlice() + + rlSlice := ld.ResourceLogs() + for i := 0; i < rlSlice.Len(); i++ { + rl := rlSlice.At(i) + resource := rl.Resource() + + if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" { + rl.CopyTo(eventsSlice.AppendEmpty()) + } else { + rl.CopyTo(logsSlice.AppendEmpty()) + } + } + + if logsSlice.Len() != 0 { + logs := plog.NewLogs() + logsSlice.CopyTo(logs.ResourceLogs()) + select { + case LogsOpsRampChannel <- logs: + s.logger.Info("#######LogsExporter: Successfully sent to logs channel") + default: + s.logger.Info("#######LogsExporter: failed sent to logs channel") + } + } + + if eventsSlice.Len() != 0 { + eventLogs := plog.NewLogs() + eventsSlice.CopyTo(eventLogs.ResourceLogs()) + select { + case EventsOpsRampChannel <- eventLogs: + s.logger.Info("#######LogsExporter: Successfully sent to events channel") + default: + s.logger.Info("#######LogsExporter: failed sent to eventschannel") + } } if s.verbosity == configtelemetry.LevelBasic { diff --git a/receiver/k8seventsreceiver/config.go b/receiver/k8seventsreceiver/config.go index 574bac489617..428ad36b7080 100644 --- a/receiver/k8seventsreceiver/config.go +++ b/receiver/k8seventsreceiver/config.go @@ -37,11 +37,24 @@ type Config struct { type InvolvedObjectProperties struct { // Include only the specified reasons. If its empty, list events of all reasons. - IncludeReasons []string `mapstructure:"include_reasons,omitempty"` + IncludeReasons []ReasonProperties `mapstructure:"include_reasons,omitempty"` //Can be enhanced to take in object names with reg ex etc. } +type ReasonProperties struct { + Name string `mapstructure:"name"` + Attributes []KeyValue `mapstructure:"attributes,omitempty"` +} + +type KeyValue struct { + // This is a required field. + Key string `mapstructure:"key"` + + // This is a required field. + Value any `mapstructure:"value"` +} + func (cfg *Config) Validate() error { for _, eventType := range cfg.EventTypes { diff --git a/receiver/k8seventsreceiver/k8s_event_to_logdata.go b/receiver/k8seventsreceiver/k8s_event_to_logdata.go index 376ec3f25c1f..83051fe96cfb 100644 --- a/receiver/k8seventsreceiver/k8s_event_to_logdata.go +++ b/receiver/k8seventsreceiver/k8s_event_to_logdata.go @@ -29,7 +29,7 @@ var severityMap = map[string]plog.SeverityNumber{ } // k8sEventToLogRecord converts Kubernetes event to plog.LogRecordSlice and adds the resource attributes. -func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs { +func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event, attributes []KeyValue) plog.Logs { ld := plog.NewLogs() rl := ld.ResourceLogs().AppendEmpty() sl := rl.ScopeLogs().AppendEmpty() @@ -74,6 +74,8 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs { attrs := lr.Attributes() attrs.EnsureCapacity(totalLogAttributes) + attrs.PutStr("k8s.event.type", ev.Type) + attrs.PutStr("k8s.event.sourceComponent", ev.Source.Component) attrs.PutStr("k8s.event.reason", ev.Reason) attrs.PutStr("k8s.event.action", ev.Action) attrs.PutStr("k8s.event.start_time", ev.ObjectMeta.CreationTimestamp.String()) @@ -87,5 +89,14 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs { attrs.PutInt("k8s.event.count", int64(ev.Count)) } + for _, kv := range attributes { + val := pcommon.NewValueEmpty() + err := val.FromRaw(kv.Value) + if err != nil { + continue + } + val.CopyTo(attrs.PutEmpty(kv.Key)) + } + return ld } diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index 6f2ff89fe290..a6b8069bb7bf 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -107,8 +107,8 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) { } func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) { - if kr.allowEvent(ev) { - ld := k8sEventToLogData(kr.settings.Logger, ev) + if attributes, allow := kr.allowEvent(ev); allow { + ld := k8sEventToLogData(kr.settings.Logger, ev, attributes) ctx := kr.obsrecv.StartLogsOp(kr.ctx) consumerErr := kr.logsConsumer.ConsumeLogs(ctx, ld) @@ -138,10 +138,10 @@ func (kr *k8seventsReceiver) startWatchingNamespace( // Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp) // not older than the receiver start time so that // event flood can be avoided upon startup. -func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool { +func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) (attributes []KeyValue, allow bool) { eventTimestamp := getEventTimestamp(ev) if eventTimestamp.Before(kr.startTime) { - return false + return attributes, false } if len(kr.config.EventTypes) != 0 { @@ -154,42 +154,44 @@ func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool { } } if !found { - return false + return attributes, false } } - existsInSlice := func(key string, slice []string) bool { - found := false + existsInSlice := func(key string, slice []ReasonProperties) ([]KeyValue, bool) { for _, k := range slice { - if key == k { - found = true - break + if key == k.Name { + return k.Attributes, true } } - return found + return []KeyValue{}, false } if len(kr.config.IncludeInvolvedObject) != 0 { if prop, exists := kr.config.IncludeInvolvedObject[ev.InvolvedObject.Kind]; !exists { if prop, exists := kr.config.IncludeInvolvedObject["Other"]; !exists { - return false + return attributes, false } else { if len(prop.IncludeReasons) != 0 { - if !existsInSlice(ev.Reason, prop.IncludeReasons) { - return false + if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists { + return attributes, false + } else { + attributes = reasonAttributes } } } } else { if len(prop.IncludeReasons) != 0 { - if !existsInSlice(ev.Reason, prop.IncludeReasons) { - return false + if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists { + return attributes, false + } else { + attributes = reasonAttributes } } } } - return true + return attributes, true } // Return the EventTimestamp based on the populated k8s event timestamps.