diff --git a/Makefile b/Makefile index c033110b6ef9..7dbd7ee14305 100644 --- a/Makefile +++ b/Makefile @@ -274,6 +274,16 @@ endif docker-otelcontribcol: COMPONENT=otelcontribcol $(MAKE) docker-component +.PHONY: docker-ubuntu-component # Not intended to be used directly +docker-ubuntu-component: check-component + GOOS=linux GOARCH=amd64 $(MAKE) $(COMPONENT) + cp ./bin/$(COMPONENT)_linux_amd64 ./cmd/$(COMPONENT)/$(COMPONENT) + docker build -f ./cmd/$(COMPONENT)/Dockerfile.ubuntu -t $(COMPONENT) ./cmd/$(COMPONENT)/ + rm ./cmd/$(COMPONENT)/$(COMPONENT) + +docker-ubuntu-otelcontribcol: + COMPONENT=otelcontribcol $(MAKE) docker-ubuntu-component + .PHONY: docker-telemetrygen docker-telemetrygen: GOOS=linux GOARCH=$(GOARCH) $(MAKE) telemetrygen diff --git a/cmd/otelcontribcol/Dockerfile.ubuntu b/cmd/otelcontribcol/Dockerfile.ubuntu new file mode 100644 index 000000000000..61b9be628441 --- /dev/null +++ b/cmd/otelcontribcol/Dockerfile.ubuntu @@ -0,0 +1,16 @@ +# Base image +FROM ubuntu:24.04 + +# Install necessary packages +RUN apt-get update && apt-get install -y \ + ca-certificates vim net-tools tcpdump \ + && rm -rf /var/lib/apt/lists/* + +# Copy the pre-built OpenTelemetry Contrib Collector binary +COPY otelcontribcol /usr/local/bin/otelcontribcol + +# Expose necessary ports +EXPOSE 4317 55680 8888 + +# Start the collector with your config +CMD ["/usr/bin/tail", "-f", "/dev/null"] diff --git a/deploy.yaml b/deploy.yaml new file mode 100644 index 000000000000..42cd6b3ba848 --- /dev/null +++ b/deploy.yaml @@ -0,0 +1,115 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app: otelcontribcol + name: otelcontribcol +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: otelcontribcol + labels: + app: otelcontribcol +rules: +- apiGroups: + - "" + resources: + - events + - namespaces + - namespaces/status + - nodes + - nodes/spec + - pods + - pods/status + - replicationcontrollers + - replicationcontrollers/status + - resourcequotas + - services + verbs: + - get + - list + - watch +- apiGroups: + - apps + resources: + - daemonsets + - deployments + - replicasets + - statefulsets + verbs: + - get + - list + - watch +- apiGroups: + - extensions + resources: + - daemonsets + - deployments + - replicasets + verbs: + - get + - list + - watch +- apiGroups: + - batch + resources: + - jobs + - cronjobs + verbs: + - get + - list + - watch +- apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: otelcontribcol + labels: + app: otelcontribcol +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: otelcontribcol +subjects: +- kind: ServiceAccount + name: otelcontribcol + namespace: default +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otelcontribcol-deployment + labels: + app: otelcontribcol +spec: + replicas: 1 + selector: + matchLabels: + app: otelcontribcol + template: + metadata: + labels: + app: otelcontribcol + spec: + serviceAccountName: otelcontribcol + containers: + - name: otelcontribcol + image: docker.io/mithunbs/otelcontibcol + imagePullPolicy: IfNotPresent + ports: + - containerPort: 4317 + - containerPort: 4318 + - containerPort: 55680 + - containerPort: 8888 + command: ["/usr/bin/tail"] + args: ["-f", "/dev/null"] diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 425336e71a5b..2b3e4b8a38cd 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr import ( "context" + "encoding/json" "fmt" "strconv" @@ -13,6 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" "go.uber.org/zap" @@ -132,6 +134,8 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p rl := ld.ResourceLogs() for i := 0; i < rl.Len(); i++ { kp.processResource(ctx, rl.At(i).Resource()) + + kp.processEventBody(rl.At(i)) } return ld, nil @@ -139,6 +143,23 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p // processResource adds Pod metadata tags to resource based on pod association configuration func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) { + resource.Attributes().Range(func(k string, v pcommon.Value) bool { + kp.logger.Debug("res-attributes", zap.Any(k, v.Str())) + return true + }) + + if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" { + if kind, found := resource.Attributes().Get("k8s.object.kind"); found { + if objectuid, found := resource.Attributes().Get("k8s.object.uid"); found { + if kind.Str() == "Pod" { + resource.Attributes().PutStr("k8s.pod.uid", objectuid.Str()) + } else if kind.Str() == "Node" { + resource.Attributes().PutStr("k8s.node.uid", objectuid.Str()) + } + } + } + } + podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations) kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue)) @@ -195,6 +216,70 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } kp.processopsrampResources(ctx, resource) + kp.addOpsrampEventResourceAttributes(ctx, resource) +} + +func (kp *kubernetesprocessor) processEventBody(resourceLogs plog.ResourceLogs) { + if val, found := resourceLogs.Resource().Attributes().Get("type"); found && val.Str() == "event" { + + bodyMap := map[string]string{} + + resourceLogs.Resource().Attributes().Range(func(k string, v pcommon.Value) bool { + bodyMap[k] = v.Str() + return true + }) + + ilss := resourceLogs.ScopeLogs() + for j := 0; j < ilss.Len(); j++ { + ils := ilss.At(j) + logs := ils.LogRecords() + for k := 0; k < logs.Len(); k++ { + lr := logs.At(k) + + lr.Attributes().Range(func(k string, v pcommon.Value) bool { + bodyMap[k] = v.Str() + return true + }) + + bodyMap["message"] = lr.Body().AsString() + + body, err := json.Marshal(bodyMap) + if err != nil { + kp.logger.Error("Failed to marshal attributes as body ") + } + + lr.Body().SetStr(string(body)) + } + } + } +} + +func (kp *kubernetesprocessor) addOpsrampEventResourceAttributes(ctx context.Context, resource pcommon.Resource) { + + if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" { + resource.Attributes().PutStr("source", "kubernetes") + resource.Attributes().PutStr("level", "Unknown") + resource.Attributes().PutStr("cluster_name", kp.redisConfig.ClusterName) + resource.Attributes().PutStr("host", kp.redisConfig.ClusterName) + resource.Attributes().PutStr("resourceUUID", kp.redisConfig.ClusterUid) + + if val, found := resource.Attributes().Get("k8s.namespace.name"); found { + resource.Attributes().PutStr("namespace", val.Str()) + } + + host := "" + if val, found := resource.Attributes().Get(semconv.AttributeK8SNodeName); found { + host = val.Str() + if host != "" { + //overwrite node opsramp resource UUID in resourceUUID + resource.Attributes().PutStr("host", host) + + if resourceUuid := kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid != "" { + resource.Attributes().PutStr("resourceUUID", resourceUuid) + } + } + } + } } // processResource adds Pod metadata tags to resource based on pod association configuration @@ -203,8 +288,15 @@ func (kp *kubernetesprocessor) processopsrampResources(ctx context.Context, reso var resourceUuid string for _, addon := range kp.addons { - //fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value) - resource.Attributes().PutStr(addon.Key, addon.Value) + // If receiver has already added some attributes with some value, then we do not overwrite here. + // For ex. type = event is already added for kube events. We should not overwrite it with type = RESOURCE. + kp.logger.Debug("addon", zap.Any("key", addon.Key)) + + if _, found := resource.Attributes().Get(addon.Key); !found { + kp.logger.Debug("addon not found adding it", zap.Any("key", addon.Key)) + + resource.Attributes().PutStr(addon.Key, addon.Value) + } } if _, found = resource.Attributes().Get("k8s.pod.uid"); found { diff --git a/receiver/k8seventsreceiver/config.go b/receiver/k8seventsreceiver/config.go index 96ba451fef24..574bac489617 100644 --- a/receiver/k8seventsreceiver/config.go +++ b/receiver/k8seventsreceiver/config.go @@ -4,11 +4,20 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver" import ( + "fmt" + k8s "k8s.io/client-go/kubernetes" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) +type EventType string + +const ( + EventTypeNormal EventType = "Normal" + EventTypeWarning EventType = "Warning" +) + // Config defines configuration for kubernetes events receiver. type Config struct { k8sconfig.APIConfig `mapstructure:",squash"` @@ -16,11 +25,33 @@ type Config struct { // List of ‘namespaces’ to collect events from. Namespaces []string `mapstructure:"namespaces"` + // List of ‘eventtypes’ to filter. + EventTypes []EventType `mapstructure:"event_types,omitempty"` + + // Include only the specified involved objects. ObjectKind to List of Reasons. + IncludeInvolvedObject map[string]InvolvedObjectProperties `mapstructure:"include_involved_objects,omitempty"` + // For mocking makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) } +type InvolvedObjectProperties struct { + // Include only the specified reasons. If its empty, list events of all reasons. + IncludeReasons []string `mapstructure:"include_reasons,omitempty"` + + //Can be enhanced to take in object names with reg ex etc. +} + func (cfg *Config) Validate() error { + + for _, eventType := range cfg.EventTypes { + switch eventType { + case EventTypeNormal, EventTypeWarning: + default: + return fmt.Errorf("invalid event_type %s, must be one of %s or %s", eventType, EventTypeNormal, EventTypeWarning) + } + } + return cfg.APIConfig.Validate() } diff --git a/receiver/k8seventsreceiver/k8s_event_to_logdata.go b/receiver/k8seventsreceiver/k8s_event_to_logdata.go index 31d2b4cdbb11..376ec3f25c1f 100644 --- a/receiver/k8seventsreceiver/k8s_event_to_logdata.go +++ b/receiver/k8seventsreceiver/k8s_event_to_logdata.go @@ -38,7 +38,13 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs { resourceAttrs := rl.Resource().Attributes() resourceAttrs.EnsureCapacity(totalResourceAttributes) - resourceAttrs.PutStr(semconv.AttributeK8SNodeName, ev.Source.Host) + eventHost := "" + if ev.ReportingInstance != "" { + eventHost = ev.ReportingInstance + } else if ev.Source.Host != "" { + eventHost = ev.Source.Host + } + resourceAttrs.PutStr(semconv.AttributeK8SNodeName, eventHost) // Attributes related to the object causing the event. resourceAttrs.PutStr("k8s.object.kind", ev.InvolvedObject.Kind) @@ -48,6 +54,8 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs { resourceAttrs.PutStr("k8s.object.api_version", ev.InvolvedObject.APIVersion) resourceAttrs.PutStr("k8s.object.resource_version", ev.InvolvedObject.ResourceVersion) + resourceAttrs.PutStr("type", "event") // This should come from config. To be enhanced. + lr.SetTimestamp(pcommon.NewTimestampFromTime(getEventTimestamp(ev))) // The Message field contains description about the event, diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index cb6c99aab2be..c3b648db2fdd 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -135,7 +135,56 @@ func (kr *k8seventsReceiver) startWatchingNamespace( // event flood can be avoided upon startup. func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool { eventTimestamp := getEventTimestamp(ev) - return !eventTimestamp.Before(kr.startTime) + if eventTimestamp.Before(kr.startTime) { + return false + } + + if len(kr.config.EventTypes) != 0 { + + found := false + for _, configuredEventType := range kr.config.EventTypes { + if ev.Type == string(configuredEventType) { + found = true + break + } + } + if !found { + return false + } + } + + existsInSlice := func(key string, slice []string) bool { + found := false + for _, k := range slice { + if key == k { + found = true + break + } + } + return found + } + + if len(kr.config.IncludeInvolvedObject) != 0 { + if prop, exists := kr.config.IncludeInvolvedObject[ev.InvolvedObject.Kind]; !exists { + if prop, exists := kr.config.IncludeInvolvedObject["Other"]; !exists { + return false + } else { + if len(prop.IncludeReasons) != 0 { + if !existsInSlice(ev.Reason, prop.IncludeReasons) { + return false + } + } + } + } else { + if len(prop.IncludeReasons) != 0 { + if !existsInSlice(ev.Reason, prop.IncludeReasons) { + return false + } + } + } + } + + return true } // Return the EventTimestamp based on the populated k8s event timestamps.