Skip to content

Commit

Permalink
Datadog k8 Metrics reciever Added
Browse files Browse the repository at this point in the history
  • Loading branch information
naman-jain-15 committed Jun 25, 2024
1 parent 148cf00 commit 9efc99e
Show file tree
Hide file tree
Showing 23 changed files with 3,317 additions and 1 deletion.
97 changes: 97 additions & 0 deletions receiver/datadogmetricreceiver/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cluster

import (
processv1 "github.com/DataDog/agent-payload/v5/process"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"log"
"strings"
)

// Private constants for clusters
const (
// Errors
clusterPayloadErrorMessage = "No metrics related to Clusters found in Payload"
// Metrics
clusterMetricNodeCount = "ddk8s.cluster.node_count"
// Attributes
clusterMetricUID = "ddk8s.cluster.uid"
clusterAttrClusterID = "ddk8s.cluster.id"
clusterAttrClusterName = "ddk8s.cluster.name"
clusterAttrKubeClusterName = "kube_cluster_name"
clusterAttrResourceVersion = "ddk8s.cluster.resource_version"
clusterAttrCPUCapacity = "ddk8s.cluster.cpu_capacity"
clusterAttrCPUAllocatable = "ddk8s.cluster.cpu_allocatable"
clusterAttrMemoryCapacity = "ddk8s.cluster.memory_capacity"
clusterAttrMemoryAllocatable = "ddk8s.cluster.memory_allocatable"
clusterAttrTags = "ddk8s.cluster.tags"
clusterMetricCreateTime = "ddk8s.cluster.create_time"
)

// GetOtlpExportReqFromClusterData converts Datadog cluster data into OTLP ExportRequest.
func GetOtlpExportReqFromClusterData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) {
ddReq, ok := Body.(*processv1.CollectorCluster)
if !ok {
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterPayloadErrorMessage)
}
cluster := ddReq.GetCluster()

if cluster == nil {
log.Println("no clusters data found so skipping")
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterPayloadErrorMessage)
}

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics()

clusterName := ddReq.GetClusterName()
clusterID := ddReq.GetClusterId()

rm := resourceMetrics.AppendEmpty()
resourceAttributes := rm.Resource().Attributes()
metricAttributes := pcommon.NewMap()
commonResourceAttributes := helpers.CommonResourceAttributes{
Origin: origin,
ApiKey: key,
MwSource: "datadog",
}
helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes)

scopeMetrics := helpers.AppendInstrScope(&rm)
setHostK8sAttributes(metricAttributes, resourceAttributes, clusterName, clusterID)
appendClusterMetrics(&scopeMetrics, resourceAttributes, metricAttributes, cluster, timestamp)

return pmetricotlp.NewExportRequestFromMetrics(metrics), nil
}

func appendClusterMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, cluster *processv1.Cluster, timestamp int64) {
scopeMetric := scopeMetrics.Metrics().AppendEmpty()
scopeMetric.SetName(clusterMetricNodeCount)

metricAttributes.PutStr(clusterAttrResourceVersion, cluster.GetResourceVersion())
metricAttributes.PutInt(clusterAttrCPUCapacity, int64(cluster.GetCpuCapacity()))
metricAttributes.PutInt(clusterAttrCPUAllocatable, int64(cluster.GetCpuAllocatable()))
metricAttributes.PutInt(clusterAttrMemoryCapacity, int64(cluster.GetMemoryCapacity()))
metricAttributes.PutInt(clusterAttrMemoryAllocatable, int64(cluster.GetMemoryAllocatable()))
metricAttributes.PutStr(clusterAttrTags, strings.Join(cluster.GetTags(), "&"))
metricAttributes.PutInt(clusterMetricCreateTime, helpers.CalculateCreateTime(cluster.GetCreationTimestamp()))

var dataPoints pmetric.NumberDataPointSlice
gauge := scopeMetric.SetEmptyGauge()
dataPoints = gauge.DataPoints()

dp := dataPoints.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(timestamp))

dp.SetIntValue(int64(cluster.GetNodeCount()))
attributeMap := dp.Attributes()
metricAttributes.CopyTo(attributeMap)
}

func setHostK8sAttributes(metricAttributes pcommon.Map, resourceAttributes pcommon.Map, clusterName string, clusterID string) {
resourceAttributes.PutStr(clusterMetricUID, clusterID)
metricAttributes.PutStr(clusterAttrClusterID, clusterID)
metricAttributes.PutStr(clusterAttrClusterName, clusterName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package clusterrolebinding

import (
processv1 "github.com/DataDog/agent-payload/v5/process"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"log"
"strings"
)

// Private constants for cluster role bindings
const (
//Errors
clusterRoleBindingsPayloadErrorMessage = "No metrics related to ClusterRoleBindings found in Payload"
//Metrics
clusterRoleBindingsMetricSubjectCount = "ddk8s.clusterrolebindings.subject.count"
//Attributes
clusterRoleBindingsMetricUID = "ddk8s.clusterrolebindings.uid"
clusterRoleBindingsMetricNamespace = "ddk8s.clusterrolebindings.namespace"
clusterRoleBindingsAttrClusterID = "ddk8s.clusterrolebindings.cluster.id"
clusterRoleBindingsAttrClusterName = "ddk8s.clusterrolebindings.cluster.name"
clusterRoleBindingsMetricName = "ddk8s.clusterrolebindings.name"
clusterRoleBindingsMetricCreateTime = "ddk8s.clusterrolebindings.create_time"
clusterRoleBindingsMetricSubjects = "ddk8s.clusterrolebindings.subjects"
clusterRoleBindingsMetricRoleRef = "ddk8s.clusterrolebindings.roleref"
clusterRoleBindingsMetricLabels = "ddk8s.clusterrolebindings.labels"
clusterRoleBindingsMetricAnnotations = "ddk8s.clusterrolebindings.annotations"
)

func GetOtlpExportReqFromDatadogClusterRoleBindingData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) {

ddReq, ok := Body.(*processv1.CollectorClusterRoleBinding)
if !ok {
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRoleBindingsPayloadErrorMessage)
}

clusterRoleBindings := ddReq.GetClusterRoleBindings()
if len(clusterRoleBindings) == 0 {
log.Println("no cluster role bindings found so skipping")
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRoleBindingsPayloadErrorMessage)
}

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics()

clusterName := ddReq.GetClusterName()
clusterID := ddReq.GetClusterId()

for _, binding := range clusterRoleBindings {
rm := resourceMetrics.AppendEmpty()
resourceAttributes := rm.Resource().Attributes()
metricAttributes := pcommon.NewMap()
commonResourceAttributes := helpers.CommonResourceAttributes{
Origin: origin,
ApiKey: key,
MwSource: "datadog",
}
helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes)

scopeMetrics := helpers.AppendInstrScope(&rm)
setHostK8sAttributes(metricAttributes, clusterName, clusterID)
appendClusterRoleBindingMetrics(&scopeMetrics, resourceAttributes, metricAttributes, binding, timestamp)
}

return pmetricotlp.NewExportRequestFromMetrics(metrics), nil
}

func appendClusterRoleBindingMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, binding *processv1.ClusterRoleBinding, timestamp int64) {
scopeMetric := scopeMetrics.Metrics().AppendEmpty()
scopeMetric.SetName(clusterRoleBindingsMetricSubjectCount)

var metricVal int64

if metadata := binding.GetMetadata(); metadata != nil {
resourceAttributes.PutStr(clusterRoleBindingsMetricUID, metadata.GetUid())
metricAttributes.PutStr(clusterRoleBindingsMetricNamespace, metadata.GetNamespace())
metricAttributes.PutStr(clusterRoleBindingsMetricName, metadata.GetName())
metricAttributes.PutStr(clusterRoleBindingsMetricLabels, strings.Join(metadata.GetLabels(), "&"))
metricAttributes.PutStr(clusterRoleBindingsMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&"))
metricAttributes.PutStr(clusterRoleBindingsMetricRoleRef, getRoleRefString(binding.GetRoleRef()))
metricAttributes.PutInt(clusterRoleBindingsMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp()))

if subjects := binding.GetSubjects(); subjects != nil {
metricAttributes.PutStr(clusterRoleBindingsMetricSubjects, convertSubjectsToString(subjects))
metricVal = int64(len(subjects))
}
}

var dataPoints pmetric.NumberDataPointSlice
gauge := scopeMetric.SetEmptyGauge()
dataPoints = gauge.DataPoints()
dp := dataPoints.AppendEmpty()

dp.SetTimestamp(pcommon.Timestamp(timestamp))
dp.SetIntValue(metricVal)

attributeMap := dp.Attributes()
metricAttributes.CopyTo(attributeMap)
}

func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) {
metricAttributes.PutStr(clusterRoleBindingsAttrClusterID, clusterID)
metricAttributes.PutStr(clusterRoleBindingsAttrClusterName, clusterName)
}

func convertSubjectsToString(subjects []*processv1.Subject) string {
var result strings.Builder

for i, subject := range subjects {
if i > 0 {
result.WriteString(";")
}

result.WriteString("kind=")
result.WriteString(subject.GetKind())

result.WriteString("&name=")
result.WriteString(subject.GetName())

result.WriteString("&namespace=")
result.WriteString(subject.GetNamespace())
}

return result.String()
}

func getRoleRefString(ref *processv1.TypedLocalObjectReference) string {
if ref == nil {
return ""
}
return "apiGroup=" + ref.GetApiGroup() + "&kind=" + ref.GetKind() + "&name=" + ref.GetName()
}
136 changes: 136 additions & 0 deletions receiver/datadogmetricreceiver/clusterroles/clusterroles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package clusterroles

import (
processv1 "github.com/DataDog/agent-payload/v5/process"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"log"
"strings"
)

// Private constants for cluster roles
const (
// Error
clusterRolePayloadErrorMessage = "No metrics related to ClusterRoles found in Payload"
// Metrics
clusterRoleMetricRuleCount = "ddk8s.clusterrole.count"
// Attributes
clusterRoleMetricUID = "ddk8s.clusterrole.uid"
clusterRoleMetricNamespace = "ddk8s.clusterrole.namespace"
clusterRoleAttrClusterID = "ddk8s.clusterrole.cluster.id"
clusterRoleAttrClusterName = "ddk8s.clusterrole.cluster.name"
clusterRoleMetricName = "ddk8s.clusterrole.name"
clusterRoleMetricCreateTime = "ddk8s.clusterrole.create.time"
clusterRoleMetricLabels = "ddk8s.clusterrole.labels"
clusterRoleMetricAnnotations = "ddk8s.clusterrole.annotations"
clusterRoleMetricType = "ddk8s.clusterrole.type"
clusterRoleMetricRules = "ddk8s.clusterrole.rules"
)

func GetOtlpExportReqFromDatadogClusterRolesData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) {

ddReq, ok := Body.(*processv1.CollectorClusterRole)
if !ok {
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRolePayloadErrorMessage)
}

croles := ddReq.GetClusterRoles()

if len(croles) == 0 {
log.Println("no croles found so skipping")
return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRolePayloadErrorMessage)
}

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics()

clusterName := ddReq.GetClusterName()
clusterID := ddReq.GetClusterId()

for _, role := range croles {
rm := resourceMetrics.AppendEmpty()
resourceAttributes := rm.Resource().Attributes()
metricAttributes := pcommon.NewMap()
commonResourceAttributes := helpers.CommonResourceAttributes{
Origin: origin,
ApiKey: key,
MwSource: "datadog",
}
helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes)

scopeMetrics := helpers.AppendInstrScope(&rm)
setHostK8sAttributes(metricAttributes, clusterName, clusterID)
appendClusterRoleMetrics(&scopeMetrics, resourceAttributes, metricAttributes, role, timestamp)
}

return pmetricotlp.NewExportRequestFromMetrics(metrics), nil
}

func appendClusterRoleMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, role *processv1.ClusterRole, timestamp int64) {
scopeMetric := scopeMetrics.Metrics().AppendEmpty()
scopeMetric.SetName(clusterRoleMetricRuleCount)

var metricVal int64

if metadata := role.GetMetadata(); metadata != nil {
resourceAttributes.PutStr(clusterRoleMetricUID, metadata.GetUid())
metricAttributes.PutStr(clusterRoleMetricNamespace, metadata.GetNamespace())
metricAttributes.PutStr(clusterRoleMetricName, metadata.GetName())
metricAttributes.PutStr(clusterRoleMetricLabels, strings.Join(metadata.GetLabels(), "&"))
metricAttributes.PutStr(clusterRoleMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&"))
metricAttributes.PutStr(clusterRoleMetricAnnotations, strings.Join(metadata.GetFinalizers(), ","))
metricAttributes.PutInt(clusterRoleMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp()))
metricAttributes.PutStr(clusterRoleMetricType, "ClusterRole")

if rules := role.GetRules(); rules != nil {
metricAttributes.PutStr(clusterRoleMetricRules, convertRulesToString(rules))
metricVal = int64(len(rules))
}
}

var dataPoints pmetric.NumberDataPointSlice
gauge := scopeMetric.SetEmptyGauge()
dataPoints = gauge.DataPoints()
dp := dataPoints.AppendEmpty()

dp.SetTimestamp(pcommon.Timestamp(timestamp))
dp.SetIntValue(metricVal)

attributeMap := dp.Attributes()
metricAttributes.CopyTo(attributeMap)
}

func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) {
metricAttributes.PutStr(clusterRoleAttrClusterID, clusterID)
metricAttributes.PutStr(clusterRoleAttrClusterName, clusterName)
}

func convertRulesToString(rules []*processv1.PolicyRule) string {
var result strings.Builder

for i, rule := range rules {
if i > 0 {
result.WriteString(";")
}

result.WriteString("verbs=")
result.WriteString(strings.Join(rule.GetVerbs(), ","))

result.WriteString("&apiGroups=")
result.WriteString(strings.Join(rule.GetApiGroups(), ","))

result.WriteString("&resources=")
result.WriteString(strings.Join(rule.GetResources(), ","))

result.WriteString("&resourceNames=")
result.WriteString(strings.Join(rule.GetResourceNames(), ","))

result.WriteString("&nonResourceURLs=")
result.WriteString(strings.Join(rule.GetNonResourceURLs(), ","))

}

return result.String()
}
Loading

0 comments on commit 9efc99e

Please sign in to comment.