Skip to content

Commit

Permalink
Merge pull request #32 from opsramp/release/v0.106.x
Browse files Browse the repository at this point in the history
merge Release/v0.106.x to main
  • Loading branch information
mithunbelur authored Oct 28, 2024
2 parents 779daf8 + c991e33 commit a273b65
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 4 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ endif
docker-otelcontribcol:
COMPONENT=otelcontribcol $(MAKE) docker-component

.PHONY: docker-ubuntu-component # Not intended to be used directly
docker-ubuntu-component: check-component
GOOS=linux GOARCH=amd64 $(MAKE) $(COMPONENT)
cp ./bin/$(COMPONENT)_linux_amd64 ./cmd/$(COMPONENT)/$(COMPONENT)
docker build -f ./cmd/$(COMPONENT)/Dockerfile.ubuntu -t $(COMPONENT) ./cmd/$(COMPONENT)/
rm ./cmd/$(COMPONENT)/$(COMPONENT)

docker-ubuntu-otelcontribcol:
COMPONENT=otelcontribcol $(MAKE) docker-ubuntu-component

.PHONY: docker-telemetrygen
docker-telemetrygen:
GOOS=linux GOARCH=$(GOARCH) $(MAKE) telemetrygen
Expand Down
16 changes: 16 additions & 0 deletions cmd/otelcontribcol/Dockerfile.ubuntu
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Base image
FROM ubuntu:24.04

# Install necessary packages
RUN apt-get update && apt-get install -y \
ca-certificates vim net-tools tcpdump \
&& rm -rf /var/lib/apt/lists/*

# Copy the pre-built OpenTelemetry Contrib Collector binary
COPY otelcontribcol /usr/local/bin/otelcontribcol

# Expose necessary ports
EXPOSE 4317 55680 8888

# Start the collector with your config
CMD ["/usr/bin/tail", "-f", "/dev/null"]
115 changes: 115 additions & 0 deletions deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
app: otelcontribcol
name: otelcontribcol
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otelcontribcol
labels:
app: otelcontribcol
rules:
- apiGroups:
- ""
resources:
- events
- namespaces
- namespaces/status
- nodes
- nodes/spec
- pods
- pods/status
- replicationcontrollers
- replicationcontrollers/status
- resourcequotas
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- daemonsets
- deployments
- replicasets
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- extensions
resources:
- daemonsets
- deployments
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
- cronjobs
verbs:
- get
- list
- watch
- apiGroups:
- autoscaling
resources:
- horizontalpodautoscalers
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: otelcontribcol
labels:
app: otelcontribcol
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: otelcontribcol
subjects:
- kind: ServiceAccount
name: otelcontribcol
namespace: default
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: otelcontribcol-deployment
labels:
app: otelcontribcol
spec:
replicas: 1
selector:
matchLabels:
app: otelcontribcol
template:
metadata:
labels:
app: otelcontribcol
spec:
serviceAccountName: otelcontribcol
containers:
- name: otelcontribcol
image: docker.io/mithunbs/otelcontibcol
imagePullPolicy: IfNotPresent
ports:
- containerPort: 4317
- containerPort: 4318
- containerPort: 55680
- containerPort: 8888
command: ["/usr/bin/tail"]
args: ["-f", "/dev/null"]
96 changes: 94 additions & 2 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr

import (
"context"
"encoding/json"
"fmt"
"strconv"

Expand All @@ -13,6 +14,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
conventions "go.opentelemetry.io/collector/semconv/v1.8.0"
"go.uber.org/zap"

Expand Down Expand Up @@ -132,13 +134,32 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p
rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
kp.processResource(ctx, rl.At(i).Resource())

kp.processEventBody(rl.At(i))
}

return ld, nil
}

// processResource adds Pod metadata tags to resource based on pod association configuration
func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) {
resource.Attributes().Range(func(k string, v pcommon.Value) bool {
kp.logger.Debug("res-attributes", zap.Any(k, v.Str()))
return true
})

if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" {
if kind, found := resource.Attributes().Get("k8s.object.kind"); found {
if objectuid, found := resource.Attributes().Get("k8s.object.uid"); found {
if kind.Str() == "Pod" {
resource.Attributes().PutStr("k8s.pod.uid", objectuid.Str())
} else if kind.Str() == "Node" {
resource.Attributes().PutStr("k8s.node.uid", objectuid.Str())
}
}
}
}

podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations)
kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue))

Expand Down Expand Up @@ -195,6 +216,70 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}
kp.processopsrampResources(ctx, resource)
kp.addOpsrampEventResourceAttributes(ctx, resource)
}

func (kp *kubernetesprocessor) processEventBody(resourceLogs plog.ResourceLogs) {
if val, found := resourceLogs.Resource().Attributes().Get("type"); found && val.Str() == "event" {

bodyMap := map[string]string{}

resourceLogs.Resource().Attributes().Range(func(k string, v pcommon.Value) bool {
bodyMap[k] = v.Str()
return true
})

ilss := resourceLogs.ScopeLogs()
for j := 0; j < ilss.Len(); j++ {
ils := ilss.At(j)
logs := ils.LogRecords()
for k := 0; k < logs.Len(); k++ {
lr := logs.At(k)

lr.Attributes().Range(func(k string, v pcommon.Value) bool {
bodyMap[k] = v.Str()
return true
})

bodyMap["message"] = lr.Body().AsString()

body, err := json.Marshal(bodyMap)
if err != nil {
kp.logger.Error("Failed to marshal attributes as body ")
}

lr.Body().SetStr(string(body))
}
}
}
}

func (kp *kubernetesprocessor) addOpsrampEventResourceAttributes(ctx context.Context, resource pcommon.Resource) {

if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" {
resource.Attributes().PutStr("source", "kubernetes")
resource.Attributes().PutStr("level", "Unknown")
resource.Attributes().PutStr("cluster_name", kp.redisConfig.ClusterName)
resource.Attributes().PutStr("host", kp.redisConfig.ClusterName)
resource.Attributes().PutStr("resourceUUID", kp.redisConfig.ClusterUid)

if val, found := resource.Attributes().Get("k8s.namespace.name"); found {
resource.Attributes().PutStr("namespace", val.Str())
}

host := ""
if val, found := resource.Attributes().Get(semconv.AttributeK8SNodeName); found {
host = val.Str()
if host != "" {
//overwrite node opsramp resource UUID in resourceUUID
resource.Attributes().PutStr("host", host)

if resourceUuid := kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid != "" {
resource.Attributes().PutStr("resourceUUID", resourceUuid)
}
}
}
}
}

// processResource adds Pod metadata tags to resource based on pod association configuration
Expand All @@ -203,8 +288,15 @@ func (kp *kubernetesprocessor) processopsrampResources(ctx context.Context, reso
var resourceUuid string

for _, addon := range kp.addons {
//fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value)
resource.Attributes().PutStr(addon.Key, addon.Value)
// If receiver has already added some attributes with some value, then we do not overwrite here.
// For ex. type = event is already added for kube events. We should not overwrite it with type = RESOURCE.
kp.logger.Debug("addon", zap.Any("key", addon.Key))

if _, found := resource.Attributes().Get(addon.Key); !found {
kp.logger.Debug("addon not found adding it", zap.Any("key", addon.Key))

resource.Attributes().PutStr(addon.Key, addon.Value)
}
}

if _, found = resource.Attributes().Get("k8s.pod.uid"); found {
Expand Down
31 changes: 31 additions & 0 deletions receiver/k8seventsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,54 @@
package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver"

import (
"fmt"

k8s "k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

type EventType string

const (
EventTypeNormal EventType = "Normal"
EventTypeWarning EventType = "Warning"
)

// Config defines configuration for kubernetes events receiver.
type Config struct {
k8sconfig.APIConfig `mapstructure:",squash"`

// List of ‘namespaces’ to collect events from.
Namespaces []string `mapstructure:"namespaces"`

// List of ‘eventtypes’ to filter.
EventTypes []EventType `mapstructure:"event_types,omitempty"`

// Include only the specified involved objects. ObjectKind to List of Reasons.
IncludeInvolvedObject map[string]InvolvedObjectProperties `mapstructure:"include_involved_objects,omitempty"`

// For mocking
makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error)
}

type InvolvedObjectProperties struct {
// Include only the specified reasons. If its empty, list events of all reasons.
IncludeReasons []string `mapstructure:"include_reasons,omitempty"`

//Can be enhanced to take in object names with reg ex etc.
}

func (cfg *Config) Validate() error {

for _, eventType := range cfg.EventTypes {
switch eventType {
case EventTypeNormal, EventTypeWarning:
default:
return fmt.Errorf("invalid event_type %s, must be one of %s or %s", eventType, EventTypeNormal, EventTypeWarning)
}
}

return cfg.APIConfig.Validate()
}

Expand Down
10 changes: 9 additions & 1 deletion receiver/k8seventsreceiver/k8s_event_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
resourceAttrs := rl.Resource().Attributes()
resourceAttrs.EnsureCapacity(totalResourceAttributes)

resourceAttrs.PutStr(semconv.AttributeK8SNodeName, ev.Source.Host)
eventHost := ""
if ev.ReportingInstance != "" {
eventHost = ev.ReportingInstance
} else if ev.Source.Host != "" {
eventHost = ev.Source.Host
}
resourceAttrs.PutStr(semconv.AttributeK8SNodeName, eventHost)

// Attributes related to the object causing the event.
resourceAttrs.PutStr("k8s.object.kind", ev.InvolvedObject.Kind)
Expand All @@ -48,6 +54,8 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
resourceAttrs.PutStr("k8s.object.api_version", ev.InvolvedObject.APIVersion)
resourceAttrs.PutStr("k8s.object.resource_version", ev.InvolvedObject.ResourceVersion)

resourceAttrs.PutStr("type", "event") // This should come from config. To be enhanced.

lr.SetTimestamp(pcommon.NewTimestampFromTime(getEventTimestamp(ev)))

// The Message field contains description about the event,
Expand Down
Loading

0 comments on commit a273b65

Please sign in to comment.