Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.112.x #37

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions exporter/opsrampdebugexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

var (
LogsOpsRampChannel = make(chan plog.Logs, 1000)
LogsOpsRampChannel = make(chan plog.Logs, 1000)
EventsOpsRampChannel = make(chan plog.Logs, 100)
)

type debugExporter struct {
Expand Down Expand Up @@ -92,11 +93,41 @@ func (s *debugExporter) pushLogs(_ context.Context, ld plog.Logs) error {
zap.Int("resource logs", ld.ResourceLogs().Len()),
zap.Int("log records", ld.LogRecordCount()))

select {
case LogsOpsRampChannel <- ld:
s.logger.Info("#######LogsExporter: Successfully sent to channel")
default:
s.logger.Info("#######LogsExporter: failed sent to channel")
eventsSlice := plog.NewResourceLogsSlice()
logsSlice := plog.NewResourceLogsSlice()

rlSlice := ld.ResourceLogs()
for i := 0; i < rlSlice.Len(); i++ {
rl := rlSlice.At(i)
resource := rl.Resource()

if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" {
rl.CopyTo(eventsSlice.AppendEmpty())
} else {
rl.CopyTo(logsSlice.AppendEmpty())
}
}

if logsSlice.Len() != 0 {
logs := plog.NewLogs()
logsSlice.CopyTo(logs.ResourceLogs())
select {
case LogsOpsRampChannel <- logs:
s.logger.Info("#######LogsExporter: Successfully sent to logs channel")
default:
s.logger.Info("#######LogsExporter: failed sent to logs channel")
}
}

if eventsSlice.Len() != 0 {
eventLogs := plog.NewLogs()
eventsSlice.CopyTo(eventLogs.ResourceLogs())
select {
case EventsOpsRampChannel <- eventLogs:
s.logger.Info("#######LogsExporter: Successfully sent to events channel")
default:
s.logger.Info("#######LogsExporter: failed sent to eventschannel")
}
}

if s.verbosity == configtelemetry.LevelBasic {
Expand Down
15 changes: 14 additions & 1 deletion receiver/k8seventsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,24 @@ type Config struct {

type InvolvedObjectProperties struct {
// Include only the specified reasons. If its empty, list events of all reasons.
IncludeReasons []string `mapstructure:"include_reasons,omitempty"`
IncludeReasons []ReasonProperties `mapstructure:"include_reasons,omitempty"`

//Can be enhanced to take in object names with reg ex etc.
}

type ReasonProperties struct {
Name string `mapstructure:"name"`
Attributes []KeyValue `mapstructure:"attributes,omitempty"`
}

type KeyValue struct {
// This is a required field.
Key string `mapstructure:"key"`

// This is a required field.
Value any `mapstructure:"value"`
}

func (cfg *Config) Validate() error {

for _, eventType := range cfg.EventTypes {
Expand Down
13 changes: 12 additions & 1 deletion receiver/k8seventsreceiver/k8s_event_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var severityMap = map[string]plog.SeverityNumber{
}

// k8sEventToLogRecord converts Kubernetes event to plog.LogRecordSlice and adds the resource attributes.
func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event, attributes []KeyValue) plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
Expand Down Expand Up @@ -74,6 +74,8 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
attrs := lr.Attributes()
attrs.EnsureCapacity(totalLogAttributes)

attrs.PutStr("k8s.event.type", ev.Type)
attrs.PutStr("k8s.event.sourceComponent", ev.Source.Component)
attrs.PutStr("k8s.event.reason", ev.Reason)
attrs.PutStr("k8s.event.action", ev.Action)
attrs.PutStr("k8s.event.start_time", ev.ObjectMeta.CreationTimestamp.String())
Expand All @@ -87,5 +89,14 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
attrs.PutInt("k8s.event.count", int64(ev.Count))
}

for _, kv := range attributes {
val := pcommon.NewValueEmpty()
err := val.FromRaw(kv.Value)
if err != nil {
continue
}
val.CopyTo(attrs.PutEmpty(kv.Key))
}

return ld
}
36 changes: 19 additions & 17 deletions receiver/k8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) {
}

func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) {
if kr.allowEvent(ev) {
ld := k8sEventToLogData(kr.settings.Logger, ev)
if attributes, allow := kr.allowEvent(ev); allow {
ld := k8sEventToLogData(kr.settings.Logger, ev, attributes)

ctx := kr.obsrecv.StartLogsOp(kr.ctx)
consumerErr := kr.logsConsumer.ConsumeLogs(ctx, ld)
Expand Down Expand Up @@ -138,10 +138,10 @@ func (kr *k8seventsReceiver) startWatchingNamespace(
// Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp)
// not older than the receiver start time so that
// event flood can be avoided upon startup.
func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool {
func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) (attributes []KeyValue, allow bool) {
eventTimestamp := getEventTimestamp(ev)
if eventTimestamp.Before(kr.startTime) {
return false
return attributes, false
}

if len(kr.config.EventTypes) != 0 {
Expand All @@ -154,42 +154,44 @@ func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool {
}
}
if !found {
return false
return attributes, false
}
}

existsInSlice := func(key string, slice []string) bool {
found := false
existsInSlice := func(key string, slice []ReasonProperties) ([]KeyValue, bool) {
for _, k := range slice {
if key == k {
found = true
break
if key == k.Name {
return k.Attributes, true
}
}
return found
return []KeyValue{}, false
}

if len(kr.config.IncludeInvolvedObject) != 0 {
if prop, exists := kr.config.IncludeInvolvedObject[ev.InvolvedObject.Kind]; !exists {
if prop, exists := kr.config.IncludeInvolvedObject["Other"]; !exists {
return false
return attributes, false
} else {
if len(prop.IncludeReasons) != 0 {
if !existsInSlice(ev.Reason, prop.IncludeReasons) {
return false
if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists {
return attributes, false
} else {
attributes = reasonAttributes
}
}
}
} else {
if len(prop.IncludeReasons) != 0 {
if !existsInSlice(ev.Reason, prop.IncludeReasons) {
return false
if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists {
return attributes, false
} else {
attributes = reasonAttributes
}
}
}
}

return true
return attributes, true
}

// Return the EventTimestamp based on the populated k8s event timestamps.
Expand Down
Loading