From b10c9b368b5e9d5e031b68319e4ede5a4f1211db Mon Sep 17 00:00:00 2001 From: naman-jain-15 Date: Tue, 25 Jun 2024 14:34:21 +0530 Subject: [PATCH] Datadog k8 Metrics reciever Added --- .../datadogmetricreceiver/cluster/cluster.go | 97 +++++ .../clusterrolebinding/clusterrolebinding.go | 134 ++++++ .../clusterroles/clusterroles.go | 136 ++++++ .../datadogmetricreceiver/cronjob/cronjob.go | 117 ++++++ .../daemonset/daemonset.go | 145 +++++++ .../deployment/deployment.go | 133 ++++++ .../datadogmetricreceiver/helpers/helpers.go | 95 +++++ receiver/datadogmetricreceiver/hpa/hpa.go | 126 ++++++ .../datadogmetricreceiver/ingress/ingress.go | 146 +++++++ receiver/datadogmetricreceiver/job/job.go | 134 ++++++ .../metricsv2translator.go | 391 ++++++++++++++++++ .../namespace/namespace.go | 102 +++++ receiver/datadogmetricreceiver/node/node.go | 127 ++++++ .../persistentvolume/persistentvolume.go | 128 ++++++ .../persistentvolumeclaim.go | 126 ++++++ receiver/datadogmetricreceiver/pod/pod.go | 104 +++++ receiver/datadogmetricreceiver/receiver.go | 127 +++++- .../replicaset/replicaset.go | 113 +++++ .../rolebinding/rolebinding.go | 137 ++++++ receiver/datadogmetricreceiver/roles/roles.go | 313 ++++++++++++++ .../datadogmetricreceiver/service/service.go | 130 ++++++ .../serviceaccount/serviceaccount.go | 131 ++++++ .../statefulset/statefulset.go | 126 ++++++ 23 files changed, 3317 insertions(+), 1 deletion(-) create mode 100644 receiver/datadogmetricreceiver/cluster/cluster.go create mode 100644 receiver/datadogmetricreceiver/clusterrolebinding/clusterrolebinding.go create mode 100644 receiver/datadogmetricreceiver/clusterroles/clusterroles.go create mode 100644 receiver/datadogmetricreceiver/cronjob/cronjob.go create mode 100644 receiver/datadogmetricreceiver/daemonset/daemonset.go create mode 100644 receiver/datadogmetricreceiver/deployment/deployment.go create mode 100644 receiver/datadogmetricreceiver/helpers/helpers.go create mode 100644 receiver/datadogmetricreceiver/hpa/hpa.go create mode 100644 receiver/datadogmetricreceiver/ingress/ingress.go create mode 100644 receiver/datadogmetricreceiver/job/job.go create mode 100644 receiver/datadogmetricreceiver/metricsv2translator.go create mode 100644 receiver/datadogmetricreceiver/namespace/namespace.go create mode 100644 receiver/datadogmetricreceiver/node/node.go create mode 100644 receiver/datadogmetricreceiver/persistentvolume/persistentvolume.go create mode 100644 receiver/datadogmetricreceiver/persistentvolumeclaim/persistentvolumeclaim.go create mode 100644 receiver/datadogmetricreceiver/pod/pod.go create mode 100644 receiver/datadogmetricreceiver/replicaset/replicaset.go create mode 100644 receiver/datadogmetricreceiver/rolebinding/rolebinding.go create mode 100644 receiver/datadogmetricreceiver/roles/roles.go create mode 100644 receiver/datadogmetricreceiver/service/service.go create mode 100644 receiver/datadogmetricreceiver/serviceaccount/serviceaccount.go create mode 100644 receiver/datadogmetricreceiver/statefulset/statefulset.go diff --git a/receiver/datadogmetricreceiver/cluster/cluster.go b/receiver/datadogmetricreceiver/cluster/cluster.go new file mode 100644 index 000000000000..c95bc17ae239 --- /dev/null +++ b/receiver/datadogmetricreceiver/cluster/cluster.go @@ -0,0 +1,97 @@ +package cluster + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for clusters +const ( + // Errors + clusterPayloadErrorMessage = "No metrics related to Clusters found in Payload" + // Metrics + clusterMetricNodeCount = "ddk8s.cluster.node_count" + // Attributes + clusterMetricUID = "ddk8s.cluster.uid" + clusterAttrClusterID = "ddk8s.cluster.id" + clusterAttrClusterName = "ddk8s.cluster.name" + clusterAttrKubeClusterName = "kube_cluster_name" + clusterAttrResourceVersion = "ddk8s.cluster.resource_version" + clusterAttrCPUCapacity = "ddk8s.cluster.cpu_capacity" + clusterAttrCPUAllocatable = "ddk8s.cluster.cpu_allocatable" + clusterAttrMemoryCapacity = "ddk8s.cluster.memory_capacity" + clusterAttrMemoryAllocatable = "ddk8s.cluster.memory_allocatable" + clusterAttrTags = "ddk8s.cluster.tags" + clusterMetricCreateTime = "ddk8s.cluster.create_time" +) + +// GetOtlpExportReqFromClusterData converts Datadog cluster data into OTLP ExportRequest. +func GetOtlpExportReqFromClusterData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorCluster) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterPayloadErrorMessage) + } + cluster := ddReq.GetCluster() + + if cluster == nil { + log.Println("no clusters data found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, resourceAttributes, clusterName, clusterID) + appendClusterMetrics(&scopeMetrics, resourceAttributes, metricAttributes, cluster, timestamp) + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendClusterMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, cluster *processv1.Cluster, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(clusterMetricNodeCount) + + metricAttributes.PutStr(clusterAttrResourceVersion, cluster.GetResourceVersion()) + metricAttributes.PutInt(clusterAttrCPUCapacity, int64(cluster.GetCpuCapacity())) + metricAttributes.PutInt(clusterAttrCPUAllocatable, int64(cluster.GetCpuAllocatable())) + metricAttributes.PutInt(clusterAttrMemoryCapacity, int64(cluster.GetMemoryCapacity())) + metricAttributes.PutInt(clusterAttrMemoryAllocatable, int64(cluster.GetMemoryAllocatable())) + metricAttributes.PutStr(clusterAttrTags, strings.Join(cluster.GetTags(), "&")) + metricAttributes.PutInt(clusterMetricCreateTime, helpers.CalculateCreateTime(cluster.GetCreationTimestamp())) + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + + dp.SetIntValue(int64(cluster.GetNodeCount())) + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, resourceAttributes pcommon.Map, clusterName string, clusterID string) { + resourceAttributes.PutStr(clusterMetricUID, clusterID) + metricAttributes.PutStr(clusterAttrClusterID, clusterID) + metricAttributes.PutStr(clusterAttrClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/clusterrolebinding/clusterrolebinding.go b/receiver/datadogmetricreceiver/clusterrolebinding/clusterrolebinding.go new file mode 100644 index 000000000000..9718d5e27b30 --- /dev/null +++ b/receiver/datadogmetricreceiver/clusterrolebinding/clusterrolebinding.go @@ -0,0 +1,134 @@ +package clusterrolebinding + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for cluster role bindings +const ( + //Errors + clusterRoleBindingsPayloadErrorMessage = "No metrics related to ClusterRoleBindings found in Payload" + //Metrics + clusterRoleBindingsMetricSubjectCount = "ddk8s.clusterrolebindings.subject.count" + //Attributes + clusterRoleBindingsMetricUID = "ddk8s.clusterrolebindings.uid" + clusterRoleBindingsMetricNamespace = "ddk8s.clusterrolebindings.namespace" + clusterRoleBindingsAttrClusterID = "ddk8s.clusterrolebindings.cluster.id" + clusterRoleBindingsAttrClusterName = "ddk8s.clusterrolebindings.cluster.name" + clusterRoleBindingsMetricName = "ddk8s.clusterrolebindings.name" + clusterRoleBindingsMetricCreateTime = "ddk8s.clusterrolebindings.create_time" + clusterRoleBindingsMetricSubjects = "ddk8s.clusterrolebindings.subjects" + clusterRoleBindingsMetricRoleRef = "ddk8s.clusterrolebindings.roleref" + clusterRoleBindingsMetricLabels = "ddk8s.clusterrolebindings.labels" + clusterRoleBindingsMetricAnnotations = "ddk8s.clusterrolebindings.annotations" +) + +func GetOtlpExportReqFromDatadogClusterRoleBindingData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + + ddReq, ok := Body.(*processv1.CollectorClusterRoleBinding) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRoleBindingsPayloadErrorMessage) + } + + clusterRoleBindings := ddReq.GetClusterRoleBindings() + if len(clusterRoleBindings) == 0 { + log.Println("no cluster role bindings found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRoleBindingsPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, binding := range clusterRoleBindings { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendClusterRoleBindingMetrics(&scopeMetrics, resourceAttributes, metricAttributes, binding, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendClusterRoleBindingMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, binding *processv1.ClusterRoleBinding, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(clusterRoleBindingsMetricSubjectCount) + + var metricVal int64 + + if metadata := binding.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(clusterRoleBindingsMetricUID, metadata.GetUid()) + metricAttributes.PutStr(clusterRoleBindingsMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(clusterRoleBindingsMetricName, metadata.GetName()) + metricAttributes.PutStr(clusterRoleBindingsMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(clusterRoleBindingsMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(clusterRoleBindingsMetricRoleRef, getRoleRefString(binding.GetRoleRef())) + metricAttributes.PutInt(clusterRoleBindingsMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + if subjects := binding.GetSubjects(); subjects != nil { + metricAttributes.PutStr(clusterRoleBindingsMetricSubjects, convertSubjectsToString(subjects)) + metricVal = int64(len(subjects)) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(clusterRoleBindingsAttrClusterID, clusterID) + metricAttributes.PutStr(clusterRoleBindingsAttrClusterName, clusterName) +} + +func convertSubjectsToString(subjects []*processv1.Subject) string { + var result strings.Builder + + for i, subject := range subjects { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("kind=") + result.WriteString(subject.GetKind()) + + result.WriteString("&name=") + result.WriteString(subject.GetName()) + + result.WriteString("&namespace=") + result.WriteString(subject.GetNamespace()) + } + + return result.String() +} + +func getRoleRefString(ref *processv1.TypedLocalObjectReference) string { + if ref == nil { + return "" + } + return "apiGroup=" + ref.GetApiGroup() + "&kind=" + ref.GetKind() + "&name=" + ref.GetName() +} diff --git a/receiver/datadogmetricreceiver/clusterroles/clusterroles.go b/receiver/datadogmetricreceiver/clusterroles/clusterroles.go new file mode 100644 index 000000000000..d4f331162d05 --- /dev/null +++ b/receiver/datadogmetricreceiver/clusterroles/clusterroles.go @@ -0,0 +1,136 @@ +package clusterroles + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for cluster roles +const ( + // Error + clusterRolePayloadErrorMessage = "No metrics related to ClusterRoles found in Payload" + // Metrics + clusterRoleMetricRuleCount = "ddk8s.clusterrole.count" + // Attributes + clusterRoleMetricUID = "ddk8s.clusterrole.uid" + clusterRoleMetricNamespace = "ddk8s.clusterrole.namespace" + clusterRoleAttrClusterID = "ddk8s.clusterrole.cluster.id" + clusterRoleAttrClusterName = "ddk8s.clusterrole.cluster.name" + clusterRoleMetricName = "ddk8s.clusterrole.name" + clusterRoleMetricCreateTime = "ddk8s.clusterrole.create.time" + clusterRoleMetricLabels = "ddk8s.clusterrole.labels" + clusterRoleMetricAnnotations = "ddk8s.clusterrole.annotations" + clusterRoleMetricType = "ddk8s.clusterrole.type" + clusterRoleMetricRules = "ddk8s.clusterrole.rules" +) + +func GetOtlpExportReqFromDatadogClusterRolesData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + + ddReq, ok := Body.(*processv1.CollectorClusterRole) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRolePayloadErrorMessage) + } + + croles := ddReq.GetClusterRoles() + + if len(croles) == 0 { + log.Println("no croles found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(clusterRolePayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, role := range croles { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendClusterRoleMetrics(&scopeMetrics, resourceAttributes, metricAttributes, role, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendClusterRoleMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, role *processv1.ClusterRole, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(clusterRoleMetricRuleCount) + + var metricVal int64 + + if metadata := role.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(clusterRoleMetricUID, metadata.GetUid()) + metricAttributes.PutStr(clusterRoleMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(clusterRoleMetricName, metadata.GetName()) + metricAttributes.PutStr(clusterRoleMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(clusterRoleMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(clusterRoleMetricAnnotations, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(clusterRoleMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + metricAttributes.PutStr(clusterRoleMetricType, "ClusterRole") + + if rules := role.GetRules(); rules != nil { + metricAttributes.PutStr(clusterRoleMetricRules, convertRulesToString(rules)) + metricVal = int64(len(rules)) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(clusterRoleAttrClusterID, clusterID) + metricAttributes.PutStr(clusterRoleAttrClusterName, clusterName) +} + +func convertRulesToString(rules []*processv1.PolicyRule) string { + var result strings.Builder + + for i, rule := range rules { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("verbs=") + result.WriteString(strings.Join(rule.GetVerbs(), ",")) + + result.WriteString("&apiGroups=") + result.WriteString(strings.Join(rule.GetApiGroups(), ",")) + + result.WriteString("&resources=") + result.WriteString(strings.Join(rule.GetResources(), ",")) + + result.WriteString("&resourceNames=") + result.WriteString(strings.Join(rule.GetResourceNames(), ",")) + + result.WriteString("&nonResourceURLs=") + result.WriteString(strings.Join(rule.GetNonResourceURLs(), ",")) + + } + + return result.String() +} diff --git a/receiver/datadogmetricreceiver/cronjob/cronjob.go b/receiver/datadogmetricreceiver/cronjob/cronjob.go new file mode 100644 index 000000000000..c4ac184734f8 --- /dev/null +++ b/receiver/datadogmetricreceiver/cronjob/cronjob.go @@ -0,0 +1,117 @@ +package cronjob + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for cron jobs +const ( + // Errors + cronJobPayloadErrorMessage = "No metrics related to CronJobs found in Payload" + // Metrics + cronJobMetricActiveJobs = "ddk8s.cronjob.active.jobs" + // Attributes + cronJobMetricUID = "ddk8s.cronjob.uid" + cronJobMetricName = "ddk8s.cronjob.name" + cronJobMetricLabels = "ddk8s.cronjob.labels" + cronJobMetricAnnotations = "ddk8s.cronjob.annotations" + cronJobMetricFinalizers = "ddk8s.cronjob.finalizers" + cronJobMetricCreateTime = "ddk8s.cronjob.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" + cronJobMetricSchedule = "ddk8s.cronjob.schedule" + cronJobMetricLastScheduleTime = "ddk8s.cronjob.last_schedule_time" +) + +var cronjobMetricsToExtract = []string{ + cronJobMetricActiveJobs, +} + +// GetOtlpExportReqFromDatadogCronJobData converts Datadog cron job data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogCronJobData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorCronJob) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(cronJobPayloadErrorMessage) + } + cronjobs := ddReq.GetCronJobs() + + if len(cronjobs) == 0 { + log.Println("no cronjobs found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(cronJobPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range cronjobMetricsToExtract { + for _, cronjob := range cronjobs { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendCronJobMetrics(&scopeMetrics, resourceAttributes, metricAttributes, metricName, cronjob, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendCronJobMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, metricName string, cronjob *processv1.CronJob, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + metadata := cronjob.GetMetadata() + resourceAttributes.PutStr(cronJobMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(cronJobMetricName, metadata.GetName()) + metricAttributes.PutStr(cronJobMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(cronJobMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(cronJobMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + + status := cronjob.GetStatus() + spec := cronjob.GetSpec() + + switch metricName { + case cronJobMetricActiveJobs: + metricVal = int64(len(status.GetActive())) + } + metricAttributes.PutStr(cronJobMetricSchedule, spec.GetSchedule()) + metricAttributes.PutInt(cronJobMetricLastScheduleTime, int64(status.GetLastScheduleTime())*1000) + metricAttributes.PutInt(cronJobMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + + dp.SetIntValue(metricVal) + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +} \ No newline at end of file diff --git a/receiver/datadogmetricreceiver/daemonset/daemonset.go b/receiver/datadogmetricreceiver/daemonset/daemonset.go new file mode 100644 index 000000000000..18156ea62fc8 --- /dev/null +++ b/receiver/datadogmetricreceiver/daemonset/daemonset.go @@ -0,0 +1,145 @@ +package daemonset + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for daemonsets +const ( + // Errors + daemonSetPayloadErrorMessage = "No metrics related to DaemonSets found in Payload" + // Metrics + daemonSetMetricCurrentScheduled = "ddk8s.daemonset.current_scheduled_nodes" + daemonSetMetricDesiredScheduled = "ddk8s.daemonset.desired_scheduled_nodes" + daemonSetMetricMisscheduled = "ddk8s.daemonset.misscheduled_nodes" + daemonSetMetricReady = "ddk8s.daemonset.ready_nodes" + daemonSetMetricAvailable = "ddk8s.daemonset.available_nodes" + daemonSetMetricUnavailable = "ddk8s.daemonset.unavailable_nodes" + daemonSetMetricUpdatedScheduled = "ddk8s.daemonset.updated_scheduled_nodes" + // Attributes + daemonSetMetricUID = "ddk8s.daemonset.uid" + daemonSetMetricName = "ddk8s.daemonset.name" + daemonSetMetricLabels = "ddk8s.daemonset.labels" + daemonSetMetricAnnotations = "ddk8s.daemonset.annotations" + daemonSetMetricFinalizers = "ddk8s.daemonset.finalizers" + daemonSetMetricCreateTime = "ddk8s.daemonset.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" +) + +var daemonsetMetricsToExtract = []string{ + daemonSetMetricCurrentScheduled, + daemonSetMetricDesiredScheduled, + daemonSetMetricMisscheduled, + daemonSetMetricReady, + daemonSetMetricAvailable, + daemonSetMetricUnavailable, + daemonSetMetricUpdatedScheduled, +} + +// GetOtlpExportReqFromDatadogDaemonSetData converts Datadog daemonset data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogDaemonSetData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorDaemonSet) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(daemonSetPayloadErrorMessage) + } + + daemonsets := ddReq.GetDaemonSets() + + if len(daemonsets) == 0 { + log.Println("no daemonsets found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(daemonSetPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range daemonsetMetricsToExtract { + for _, daemonset := range daemonsets { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendDaemonSetMetrics(&scopeMetrics, resourceAttributes, metricAttributes, daemonset, metricName, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendDaemonSetMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, daemonset *processv1.DaemonSet, metricName string, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + if metadata := daemonset.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(daemonSetMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(daemonSetMetricName, metadata.GetName()) + metricAttributes.PutStr(daemonSetMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(daemonSetMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(daemonSetMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(daemonSetMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + status := daemonset.GetStatus() + spec := daemonset.GetSpec() + + if status != nil { + switch metricName { + case daemonSetMetricCurrentScheduled: + metricVal = int64(status.GetCurrentNumberScheduled()) + case daemonSetMetricDesiredScheduled: + metricVal = int64(status.GetDesiredNumberScheduled()) + case daemonSetMetricMisscheduled: + metricVal = int64(status.GetNumberMisscheduled()) + case daemonSetMetricReady: + metricVal = int64(status.GetNumberReady()) + case daemonSetMetricAvailable: + metricVal = int64(status.GetNumberAvailable()) + case daemonSetMetricUnavailable: + metricVal = int64(status.GetNumberUnavailable()) + case daemonSetMetricUpdatedScheduled: + metricVal = int64(status.GetUpdatedNumberScheduled()) + } + } + + if spec != nil { + metricAttributes.PutStr("ddk8s.daemonset.deployment_strategy", spec.GetDeploymentStrategy()) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/deployment/deployment.go b/receiver/datadogmetricreceiver/deployment/deployment.go new file mode 100644 index 000000000000..ebb5825364c0 --- /dev/null +++ b/receiver/datadogmetricreceiver/deployment/deployment.go @@ -0,0 +1,133 @@ +package deployment + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for deployments +const ( + // Errors + deploymentPayloadErrorMessage = "No metrics related to Deployments found in Payload" + // Metrics + deploymentMetricDesired = "ddk8s.deployment.desired" + deploymentMetricAvailable = "ddk8s.deployment.available" + deploymentMetricReplicasUpdated = "ddk8s.deployment.replicas.updated" + deploymentMetricReplicasUnupdated = "dk8s.deployment.replicas.unupdated" + deploymentMetricReplicasAvailable = "dk8s.deployment.replicas.available" + deploymentMetricReplicasUnavailable = "dk8s.deployment.replicas.unavailable" + // Attributes + deploymentMetricUID = "ddk8s.deployment.uid" + deploymentMetricName = "ddk8s.deployment.name" + deploymentMetricLabels = "ddk8s.deployment.labels" + deploymentMetricAnnotations = "ddk8s.deployment.annotations" + deploymentDeploymentStrategy = "ddk8s.deployment.deployment_strategy" + deploymentMetricFinalizers = "ddk8s.deployment.finalizers" + deploymentMetricCreateTime = "ddk8s.deployment.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" +) + +var deploymentMetricsToExtract = []string{ + deploymentMetricDesired, + deploymentMetricAvailable, + deploymentMetricReplicasUpdated, + deploymentMetricReplicasUnupdated, + deploymentMetricReplicasAvailable, + deploymentMetricReplicasUnavailable, +} + +// GetOtlpExportReqFromDatadogDeploymentData converts Datadog deployment data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogDeploymentData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorDeployment) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(deploymentPayloadErrorMessage) + } + deployments := ddReq.GetDeployments() + + if len(deployments) == 0 { + log.Println("no deployments found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(deploymentPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range deploymentMetricsToExtract { + for _, deployment := range deployments { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendDeploymentMetrics(&scopeMetrics, resourceAttributes, metricAttributes, deployment, metricName, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendDeploymentMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, deployment *processv1.Deployment, metricName string, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + if metadata := deployment.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(deploymentMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(deploymentMetricName, metadata.GetName()) + metricAttributes.PutStr(deploymentMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(deploymentMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(deploymentMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(deploymentMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + metricAttributes.PutStr(deploymentDeploymentStrategy, deployment.GetDeploymentStrategy()) + } + + switch metricName { + case deploymentMetricDesired: + metricVal = int64(deployment.GetReplicasDesired()) + case deploymentMetricAvailable: + metricVal = int64(deployment.GetReplicas()) + case deploymentMetricReplicasUpdated: + metricVal = int64(deployment.GetUpdatedReplicas()) + case deploymentMetricReplicasUnupdated: + metricVal = int64(deployment.GetUpdatedReplicas()) // RECHECK THIS + case deploymentMetricReplicasAvailable: + metricVal = int64(deployment.GetAvailableReplicas()) + case deploymentMetricReplicasUnavailable: + metricVal = int64(deployment.GetUnavailableReplicas()) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/helpers/helpers.go b/receiver/datadogmetricreceiver/helpers/helpers.go new file mode 100644 index 000000000000..cb43ef482652 --- /dev/null +++ b/receiver/datadogmetricreceiver/helpers/helpers.go @@ -0,0 +1,95 @@ +package helpers + +import ( + "errors" + "strings" + "time" +metricsV2 "github.com/DataDog/agent-payload/v5/gogen" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +const ( + scopeName = "mw" + scopeVersion = "v0.0.1" +) + +const ( + datadogMetricTypeCount = int32(metricsV2.MetricPayload_COUNT) + datadogMetricTypeGauge = int32(metricsV2.MetricPayload_GAUGE) + datadogMetricTypeRate = int32(metricsV2.MetricPayload_RATE) + + datadogAPIKeyHeader = "Dd-Api-Key" +) + + +type CommonResourceAttributes struct { + Origin string + ApiKey string + MwSource string + Host string +} + +func CalculateCreateTime(creationTimestamp int64) int64 { + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + createtime := (int64(milliseconds/1000000000) - creationTimestamp) + return createtime +} + +func GetMillis() int64 { + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + return milliseconds +} + +// NewErrNoMetricsInPayload creates a new error indicating no metrics found in the payload with the given message. +// If message is empty, a default error message is used. +func NewErrNoMetricsInPayload(message string) error { + if message == "" { + message = "no metrics found in payload" + } + return errors.New(message) +} + +func SetMetricResourceAttributes(attributes pcommon.Map, + cra CommonResourceAttributes) { + if cra.Origin != "" { + attributes.PutStr("mw.client_origin", cra.Origin) + } + if cra.ApiKey != "" { + attributes.PutStr("mw.account_key", cra.ApiKey) + } + if cra.MwSource != "" { + attributes.PutStr("mw_source", cra.MwSource) + } + if cra.Host != "" { + attributes.PutStr("host.id", cra.Host) + attributes.PutStr("host.name", cra.Host) + } +} + +func AppendInstrScope(rm *pmetric.ResourceMetrics) pmetric.ScopeMetrics { + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + instrumentationScope := scopeMetrics.Scope() + instrumentationScope.SetName(scopeName) + instrumentationScope.SetVersion(scopeVersion) + return scopeMetrics +} + +func SkipDatadogMetrics(metricName string, metricType int32) bool { + if strings.HasPrefix(metricName, "datadog") { + return true + } + + if strings.HasPrefix(metricName, "n_o_i_n_d_e_x.datadog") { + return true + } + + if metricType != datadogMetricTypeRate && + metricType != datadogMetricTypeGauge && + metricType != datadogMetricTypeCount { + return true + } + return false +} diff --git a/receiver/datadogmetricreceiver/hpa/hpa.go b/receiver/datadogmetricreceiver/hpa/hpa.go new file mode 100644 index 000000000000..947c30d2fba3 --- /dev/null +++ b/receiver/datadogmetricreceiver/hpa/hpa.go @@ -0,0 +1,126 @@ +package hpa + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Constants for HPA metrics +const ( + // Metrics + hpaMetricCurrentReplicas = "ddk8s.hpa.current_replicas" + hpaMetricDesiredReplicas = "ddk8s.hpa.desired_replicas" + hpaMetricMaxReplicas = "ddk8s.hpa.max_replicas" + hpaMetricMinReplicas = "ddk8s.hpa.min_replicas" + hpaMetricUID = "ddk8s.hpa.uid" + // Attributes + hpaMetricName = "ddk8s.hpa.name" + hpaMetricNamespace = "ddk8s.hpa.namespace" + hpaMetricLabels = "ddk8s.hpa.labels" + hpaMetricAnnotations = "ddk8s.hpa.annotations" + hpaMetricFinalizers = "ddk8s.hpa.finalizers" + hpaMetricClusterID = "ddk8s.hpa.cluster.id" + hpaMetricClusterName = "ddk8s.hpa.cluster.name" + // Error + ErrNoMetricsInPayload = "No metrics related to HPA found in Payload" +) + +var hpaMetricsToExtract = []string{ + hpaMetricCurrentReplicas, + hpaMetricDesiredReplicas, + hpaMetricMaxReplicas, + hpaMetricMinReplicas, +} + +// GetOtlpExportReqFromDatadogHPAData converts Datadog HPA data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogHPAData(origin string, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + + ddReq, ok := Body.(*processv1.CollectorHorizontalPodAutoscaler) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(ErrNoMetricsInPayload) + } + + hpas := ddReq.GetHorizontalPodAutoscalers() + + if len(hpas) == 0 { + log.Println("no hpas found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(ErrNoMetricsInPayload) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range hpaMetricsToExtract { + for _, hpa := range hpas { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendHPAMetrics(&scopeMetrics, resourceAttributes, metricAttributes, hpa, metricName, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendHPAMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, hpa *processv1.HorizontalPodAutoscaler, metricName string, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + metadata := hpa.GetMetadata() + if metadata != nil { + resourceAttributes.PutStr(hpaMetricUID, metadata.GetUid()) + metricAttributes.PutStr(hpaMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(hpaMetricName, metadata.GetName()) + metricAttributes.PutStr(hpaMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(hpaMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(hpaMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + } + + specDetails := hpa.GetSpec() + statusDetails := hpa.GetStatus() + + switch metricName { + case hpaMetricCurrentReplicas: + metricVal = int64(statusDetails.GetCurrentReplicas()) + case hpaMetricDesiredReplicas: + metricVal = int64(statusDetails.GetDesiredReplicas()) + case hpaMetricMaxReplicas: + metricVal = int64(specDetails.GetMaxReplicas()) + case hpaMetricMinReplicas: + metricVal = int64(specDetails.GetMinReplicas()) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(hpaMetricClusterID, clusterID) + metricAttributes.PutStr(hpaMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/ingress/ingress.go b/receiver/datadogmetricreceiver/ingress/ingress.go new file mode 100644 index 000000000000..91c361376e93 --- /dev/null +++ b/receiver/datadogmetricreceiver/ingress/ingress.go @@ -0,0 +1,146 @@ +package ingress + +import ( + "fmt" + + processv1 "github.com/DataDog/agent-payload/v5/process" + //"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver" + "log" + "strings" + //"time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +const ( + // Attribute keys + attrUID = "ddk8s.ingress.uid" + attrNamespace = "ddk8s.ingress.namespace" + attrClusterID = "ddk8s.ingress.cluster.id" + attrClusterName = "ddk8s.ingress.cluster.name" + attrName = "ddk8s.ingress.name" + attrLabels = "ddk8s.ingress.labels" + attrAnnotations = "ddk8s.ingress.annotations" + attrFinalizers = "ddk8s.ingress.finalizers" + attrRules = "ddk8s.ingress.rules" + attrType = "ddk8s.ingress.type" + attrCreateTime = "ddk8s.ingress.create_time" + IngressPayloadErrorMessage = "No metrics related to Ingress found in Payload" + // Metric names + IngressmetricRuleCount = "ddk8s.ingress.rule_count" +) + +func GetOtlpExportReqFromDatadogIngressData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorIngress) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(IngressPayloadErrorMessage) + } + ingresses := ddReq.GetIngresses() + + if len(ingresses) == 0 { + log.Println("no ingresses found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(IngressPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + cluster_name := ddReq.GetClusterName() + cluster_id := ddReq.GetClusterId() + + for _, ingress := range ingresses { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, cluster_name, cluster_id) + appendMetrics(&scopeMetrics, resourceAttributes, metricAttributes, ingress, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, ingress *processv1.Ingress, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(IngressmetricRuleCount) + + var metricVal int64 + + if metadata := ingress.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(attrUID, metadata.GetUid()) + metricAttributes.PutStr(attrNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(attrName, metadata.GetName()) + metricAttributes.PutStr(attrLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(attrAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(attrFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(attrCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + metricAttributes.PutStr(attrType, "Ingress") + + if specDetails := ingress.GetSpec(); specDetails != nil { + if rules := specDetails.GetRules(); rules != nil { + metricVal = int64(len(rules)) + metricAttributes.PutStr(attrRules, convertIngressRulesToString(rules)) + } + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func convertIngressRulesToString(rules []*processv1.IngressRule) string { + var result strings.Builder + + for i, rule := range rules { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("host=") + result.WriteString(rule.GetHost()) + + result.WriteString("&http=(paths=") + for j, path := range rule.GetHttpPaths() { + if j > 0 { + result.WriteString("&") + } + + result.WriteString("(path=") + result.WriteString(path.GetPath()) + result.WriteString("&pathType=") + result.WriteString(path.GetPathType()) + result.WriteString("&backend=(service=(name=") + result.WriteString(path.GetBackend().GetService().GetServiceName()) + result.WriteString("&port=(number=") + result.WriteString(fmt.Sprintf("%d", path.GetBackend().GetService().GetPortNumber())) + result.WriteString(")))") + } + result.WriteString(")") + } + + return result.String() +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, cluster_name string, cluster_id string) { + metricAttributes.PutStr(attrClusterID, cluster_id) + metricAttributes.PutStr(attrClusterName, cluster_name) +} + diff --git a/receiver/datadogmetricreceiver/job/job.go b/receiver/datadogmetricreceiver/job/job.go new file mode 100644 index 000000000000..bc74de3a9bf4 --- /dev/null +++ b/receiver/datadogmetricreceiver/job/job.go @@ -0,0 +1,134 @@ +package job + +import ( + "fmt" + "log" + "strings" + + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +// Private constants for jobs +const ( + // Errors + jobPayloadErrorMessage = "No metrics related to Jobs found in Payload" + // Metrics + jobMetricActivePods = "ddk8s.job.active_pods" + jobMetricDesiredSuccessfulPods = "ddk8s.job.desired_successful_pods" + jobMetricFailedPods = "ddk8s.job.failed_pods" + jobMetricMaxParallelPods = "ddk8s.job.max_parallel_pods" + jobMetricSuccessfulPods = "ddk8s.job.successful_pods" + // Attributes + jobMetricUID = "ddk8s.job.uid" + jobMetricName = "ddk8s.job.name" + jobMetricLabels = "ddk8s.job.labels" + jobMetricAnnotations = "ddk8s.job.annotations" + jobMetricFinalizers = "ddk8s.job.finalizers" + jobMetricCreateTime = "ddk8s.job.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" +) + +var jobMetricsToExtract = []string{ + jobMetricActivePods, + jobMetricDesiredSuccessfulPods, + jobMetricFailedPods, + jobMetricMaxParallelPods, + jobMetricSuccessfulPods, +} + +// GetOtlpExportReqFromDatadogJobData converts Datadog job data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogJobData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorJob) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(jobPayloadErrorMessage) + } + + jobs := ddReq.GetJobs() + + if len(jobs) == 0 { + log.Println("no jobs found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(jobPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + fmt.Println("clusterName",clusterID) + fmt.Println("clusterID",clusterName) + for _, metricName := range jobMetricsToExtract { + for _, job := range jobs { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendJobMetrics(&scopeMetrics, resourceAttributes, metricAttributes, metricName, job, timestamp) + //fmt.Println(metricAttributes.AsRaw()) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendJobMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, metricName string, job *processv1.Job, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + metadata := job.GetMetadata() + resourceAttributes.PutStr(jobMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(jobMetricName, metadata.GetName()) + metricAttributes.PutStr(jobMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(jobMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(jobMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(jobMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + status := job.GetStatus() + spec := job.GetSpec() + + switch metricName { + case jobMetricActivePods: + metricVal = int64(status.GetActive()) + case jobMetricDesiredSuccessfulPods: + metricVal = int64(spec.GetCompletions()) + case jobMetricFailedPods: + metricVal = int64(status.GetFailed()) + case jobMetricMaxParallelPods: + metricVal = int64(spec.GetParallelism()) + case jobMetricSuccessfulPods: + metricVal = int64(status.GetSucceeded()) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + + dp.SetIntValue(metricVal) + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/metricsv2translator.go b/receiver/datadogmetricreceiver/metricsv2translator.go new file mode 100644 index 000000000000..cfc4dca9912a --- /dev/null +++ b/receiver/datadogmetricreceiver/metricsv2translator.go @@ -0,0 +1,391 @@ +package datadogmetricreceiver + +import ( + metricsV2 "github.com/DataDog/agent-payload/v5/gogen" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "math" + "strings" +) + +// MetricTranslator function type +type MetricTranslator func(*metricsV2.MetricPayload_MetricSeries, string, map[string]string, pcommon.Map, pcommon.Map) bool + +type MetricBuilder func(*metricsV2.MetricPayload_MetricSeries, pmetric.Metric, pcommon.Map) + +// Global variables for translators +var ( + translators = []MetricTranslator{ + translateKubernetesStateCountMetrics, + translateContainerMetrics, + translateKubernetesStatePod, + translateKubernetesStateNode, + translateKubernetes, + translateKubernetesStateContainer, + } +) + +const ( + // Error + ErrNoMetricsInSeriesPayload = "No metrics found in Payload V2 Series" + // Suffix + countSuffix = "count" + totalSuffix = "total" + // Prefix + kubernetesStatePrefix = "kubernetes_state" + kubernetesStateNodePrefix = "kubernetes_state.node" + kubernetesStatePodPrefix = "kubernetes_state.pod" + kubernetesStateContainerPrefix = "kubernetes_state.container." + systemCPUPrefix = "system.cpu." + kubernetesPrefix = "kubernetes." + containerPrefix = "container." + // Datadog Tags + nodeTag = "node" + clusterNameTag = "kube_cluster_name" + namespaceTag = "kube_namespace" + containerIDTag = "uid" + podNameTag = "pod_name" + ddClusterNameTag = "dd_cluster_name" + kubeServiceTag = "kube_service" + // Middleware Attribute Keys + isCountKey = "ddk8s.is_count" + nodeNameKey = "ddk8s.node.name" + clusterNameKey = "ddk8s.cluster.name" + namespaceNameKey = "ddk8s.namespace.name" + containerUIDKey = "ddk8s.container.uid" + podNameKey = "ddk8s.pod.name" + isKubeHost = "ddk8s.is_kube_host" + containerTagsKey = "ddk8s.container.tags" + serviceNameKey = "ddk8s.service.name" +) + +// Main function to process Datadog metrics +func GetOtlpExportReqFromDatadogV2Metrics(origin, key string, ddReq metricsV2.MetricPayload) (pmetricotlp.ExportRequest, error) { + + if len(ddReq.GetSeries()) == 0 { + log.Println("No Metrics found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(ErrNoMetricsInSeriesPayload) + } + + metricHost := getMetricHost(ddReq.GetSeries()) + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + for _, s := range ddReq.GetSeries() { + //log.Println("s.GetMetric()", s.GetMetric()) + if helpers.SkipDatadogMetrics(s.GetMetric(), int32(s.GetType())) { + continue + } + + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + Host: metricHost, + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + scopeMetric := initializeScopeMetric(s, &scopeMetrics) + metricAttributes := pcommon.NewMap() + // Currently This is added to Classify If A Metric is Datadog . Useful For Front Side + metricAttributes.PutBool("datadog_metric", true) + + tagMap := tagsToMap(s.GetTags()) + + translateMetric(s, metricHost, tagMap, resourceAttributes, metricAttributes) + setDataPoints(s, &scopeMetric, metricAttributes) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func translateMetric(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) { + for _, translator := range translators { + if translator(s, metricHost, tagMap, resourceAttributes, metricAttributes) { + return + } + } + defaultTranslator(s, metricAttributes) +} + +func getMetricHost(metricSeries []*metricsV2.MetricPayload_MetricSeries) string { + host := "" + for _, series := range metricSeries { + + // Iterate through each resource in the series + for _, resource := range series.GetResources() { + if resource.GetType() == "host" { + host = resource.GetName() + } + // } else if i == 0 { + // //resourceAttributes.PutStr(resource.GetType(), resource.GetName()) + // } + } + // Break the outer loop if metricHost is set + if host != "" { + break + } + } + return host +} + +func initializeScopeMetric(s *metricsV2.MetricPayload_MetricSeries, scopeMetrics *pmetric.ScopeMetrics) pmetric.Metric { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(s.GetMetric()) + scopeMetric.SetUnit(s.GetUnit()) + return scopeMetric +} + +func defaultTranslator(s *metricsV2.MetricPayload_MetricSeries, metricAttributes pcommon.Map) { + // Decide if It is kubernetes Host + // Used in Host Dialog Tabs + if strings.Contains(s.GetMetric(), systemCPUPrefix) { + metricAttributes.PutBool(isKubeHost, true) + } + for _, tag := range s.GetTags() { + parts := strings.Split(tag, ":") + if len(parts) == 2 { + metricAttributes.PutStr(parts[0], parts[1]) + } + } +} + +func translateKubernetesStateCountMetrics(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, kubernetesStatePrefix) { + return false + } + + if !strings.HasSuffix(metricName, countSuffix) && !strings.HasSuffix(metricName, totalSuffix) { + return false + } + + resourceAttributes.PutStr(isCountKey, "true") + + nodeName := tagMap[nodeTag] + if nodeName == "" { + nodeName = metricHost + } + resourceAttributes.PutStr(nodeNameKey, nodeName) + + metricAttributes.PutStr(clusterNameKey, tagMap[clusterNameTag]) + metricAttributes.PutStr(namespaceNameKey, tagMap[namespaceTag]) + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func translateContainerMetrics(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, containerPrefix) { + return false + } + + //kubeClusterName := tagMap[clusterNameTag] + kubeNamespace := tagMap[namespaceTag] + containerID := tagMap["container_id"] + + if containerID == "" { + return false + } + + resourceAttributes.PutStr(containerUIDKey, containerID) + resourceAttributes.PutStr(podNameKey, tagMap[podNameTag]) + // Note Assumption Node and Host Name are same + nodeName := tagMap[nodeTag] + if nodeName == "" { + nodeName = metricHost + } + resourceAttributes.PutStr(nodeNameKey, nodeName) + + metricAttributes.PutStr(clusterNameKey, tagMap[ddClusterNameTag]) + metricAttributes.PutStr(namespaceNameKey, kubeNamespace) + metricAttributes.PutStr(containerTagsKey, strings.Join(s.GetTags(), "&")) + + if kubeService := tagMap[kubeServiceTag]; kubeService != "" { + metricAttributes.PutStr(serviceNameKey, kubeService) + } + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func translateKubernetesStateNode(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, kubernetesStateNodePrefix) { + return false + } + // Note Assumption Node and Host Name are same + nodeName := tagMap[nodeTag] + if nodeName == "" { + nodeName = metricHost + } + resourceAttributes.PutStr(nodeNameKey, nodeName) + + metricAttributes.PutStr(clusterNameKey, tagMap[clusterNameTag]) + metricAttributes.PutStr(namespaceNameKey, tagMap[namespaceTag]) + + if kubeService := tagMap[kubeServiceTag]; kubeService != "" { + metricAttributes.PutStr(serviceNameKey, kubeService) + } + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func translateKubernetesStatePod(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, kubernetesStatePodPrefix) { + return false + } + // Note Assumption Node and Host Name are same + nodeName := tagMap[nodeTag] + if nodeName == "" { + nodeName = metricHost + } + resourceAttributes.PutStr(podNameKey, tagMap[podNameTag]) + resourceAttributes.PutStr(nodeNameKey, nodeName) + metricAttributes.PutStr(clusterNameKey, tagMap[clusterNameTag]) + metricAttributes.PutStr(namespaceNameKey, tagMap[namespaceTag]) + + if kubeService := tagMap[kubeServiceTag]; kubeService != "" { + metricAttributes.PutStr(serviceNameKey, kubeService) + } + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func translateKubernetes(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, kubernetesPrefix) { + return false + } + // Note Assumption Node and Host Name are same + nodeName := tagMap[nodeTag] + if nodeName == "" { + nodeName = metricHost + } + resourceAttributes.PutStr(isCountKey, "true") + + metricAttributes.PutStr(namespaceNameKey, tagMap[namespaceTag]) + metricAttributes.PutStr(clusterNameKey, tagMap[clusterNameTag]) + + if podName := tagMap[podNameTag]; podName != "" { + resourceAttributes.PutStr(podNameKey, podName) + } + + if nodeName := tagMap[nodeTag]; nodeName != "" { + resourceAttributes.PutStr(nodeNameKey, nodeName) + } else { + resourceAttributes.PutStr(nodeNameKey, metricHost) + } + + // Rewrite As It is Empty + if tagMap[clusterNameTag] == "" { + metricAttributes.PutStr(clusterNameKey, tagMap[ddClusterNameTag]) + } + + if kubeService := tagMap[kubeServiceTag]; kubeService != "" { + metricAttributes.PutStr(serviceNameKey, kubeService) + } + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func translateKubernetesStateContainer(s *metricsV2.MetricPayload_MetricSeries, metricHost string, tagMap map[string]string, resourceAttributes, metricAttributes pcommon.Map) bool { + metricName := s.GetMetric() + if !strings.HasPrefix(metricName, kubernetesStateContainerPrefix) { + return false + } + + //kubeClusterName := tagMap[clusterNameTag] + kubeNamespace := tagMap[namespaceTag] + containerID := tagMap[containerIDTag] + + if containerID == "" { + return false + } + + resourceAttributes.PutStr(containerUIDKey, containerID) + resourceAttributes.PutStr(podNameKey, tagMap[podNameTag]) + // Note Assumption Node and Host Name are same + if nodeName := tagMap[nodeTag]; nodeName != "" { + resourceAttributes.PutStr(nodeNameKey, nodeName) + } else { + resourceAttributes.PutStr(nodeNameKey, metricHost) + } + + metricAttributes.PutStr(clusterNameKey, tagMap[ddClusterNameTag]) + metricAttributes.PutStr(namespaceNameKey, kubeNamespace) + metricAttributes.PutStr(containerTagsKey, strings.Join(s.GetTags(), "&")) + + if kubeService := tagMap[kubeServiceTag]; kubeService != "" { + metricAttributes.PutStr(serviceNameKey, kubeService) + } + + for k, v := range tagMap { + metricAttributes.PutStr(k, v) + } + + return true +} + +func setDataPoints(s *metricsV2.MetricPayload_MetricSeries, scopeMetric *pmetric.Metric, metricAttributes pcommon.Map) { + var dataPoints pmetric.NumberDataPointSlice + // in case datadog metric is rate, we need to multiply + // the value in the metric by multiplyFactor to get the sum + // for otlp metrics. + multiplyFactor := 1.0 + switch s.GetType() { + case metricsV2.MetricPayload_RATE: + multiplyFactor = float64(s.GetInterval()) + fallthrough + case metricsV2.MetricPayload_COUNT: + sum := scopeMetric.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + sum.SetIsMonotonic(false) + dataPoints = sum.DataPoints() + case metricsV2.MetricPayload_GAUGE: + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + default: + log.Println("datadog metric not yet handled", "type", s.Metric) + return + } + + for _, point := range s.GetPoints() { + // Datadog payload stores timestamp as first member of Point array + unixNano := float64(point.GetTimestamp()) * math.Pow(10, 9) + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(unixNano)) + // Datadog payload stores count value as second member of Point + // array + dp.SetDoubleValue(float64(point.GetValue()) * multiplyFactor) + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) + } + +} diff --git a/receiver/datadogmetricreceiver/namespace/namespace.go b/receiver/datadogmetricreceiver/namespace/namespace.go new file mode 100644 index 000000000000..e6d2c0416ed7 --- /dev/null +++ b/receiver/datadogmetricreceiver/namespace/namespace.go @@ -0,0 +1,102 @@ +package namespace + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for namespaces +const ( + // Errors + namespacePayloadErrorMessage = "No metrics related to Namespaces found in Payload" + // Metrics + namespaceMetricMetadata = "ddk8s.namespace.metadata" + // Attributes + namespaceMetricUID = "ddk8s.namespace.uid" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricStatus = "ddk8s.namespace.status" + namespaceMetricLabels = "ddk8s.namespace.labels" + namespaceMetricAnnotations = "ddk8s.namespace.annotations" + namespaceMetricFinalizers = "ddk8s.namespace.finalizers" + namespaceMetricCreateTime = "ddk8s.namespace.create_time" + namespaceAttrClusterID = "ddk8s.cluster.id" + namespaceAttrClusterName = "ddk8s.cluster.name" +) + +// GetOtlpExportReqFromNamespaceData converts Datadog namespace data into OTLP ExportRequest. +func GetOtlpExportReqFromNamespaceData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorNamespace) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(namespacePayloadErrorMessage) + } + + namespaces := ddReq.GetNamespaces() + + if len(namespaces) == 0 { + log.Println("no namespaces found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(namespacePayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, namespace := range namespaces { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendNamespaceMetrics(&scopeMetrics, resourceAttributes, metricAttributes, namespace, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendNamespaceMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, namespace *processv1.Namespace, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(namespaceMetricMetadata) + + var metricVal int64 + + if metadata := namespace.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(namespaceMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetName()) + metricAttributes.PutStr(namespaceMetricStatus, namespace.GetStatus()) + metricAttributes.PutStr(namespaceMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(namespaceMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(namespaceMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(namespaceMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceAttrClusterID, clusterID) + metricAttributes.PutStr(namespaceAttrClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/node/node.go b/receiver/datadogmetricreceiver/node/node.go new file mode 100644 index 000000000000..e331451d0892 --- /dev/null +++ b/receiver/datadogmetricreceiver/node/node.go @@ -0,0 +1,127 @@ +package node + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for nodes +const ( + // Errors + nodePayloadErrorMessage = "No metrics related to Nodes found in Payload" + // Metrics + nodeMetricMetadata = "ddk8s.node.metadata" + // Attributes + nodeName = "ddk8s.node.name" + nodeMetricNamespace = "ddk8s.namespace.name" + nodeAttrClusterID = "ddk8s.cluster.id" + nodeAttrClusterName = "ddk8s.cluster.name" + nodeAttrKubeClusterName = "kube_cluster_name" + nodeMetricRoles = "ddk8s.node.roles" + nodeMetricLabels = "ddk8s.node.labels" + nodeMetricAnnotations = "ddk8s.node.annotations" + nodeMetricFinalizers = "ddk8s.node.finalizers" + nodeMetricIP = "ddk8s.node.ip" + nodeMetricHostName = "ddk8s.node.host.name" + nodeMetricCreateTime = "ddk8s.node.create_time" +) + +// GetOtlpExportReqFromDatadogRoleBindingData converts Datadog role binding data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogNodeData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorNode) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(nodePayloadErrorMessage) + } + nodes := ddReq.GetNodes() + + if len(nodes) == 0 { + log.Println("no nodes found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(nodePayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, node := range nodes { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendNodeMetrics(&scopeMetrics, resourceAttributes, metricAttributes, node, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendNodeMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, node *processv1.Node, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(nodeMetricMetadata) + + var metricVal int64 + + if metadata := node.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(nodeName, metadata.GetName()) + metricAttributes.PutStr(nodeMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(nodeMetricRoles, strings.Join(node.GetRoles(), "&")) + metricAttributes.PutStr(nodeMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(nodeMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(nodeMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutStr(nodeMetricIP, getNodeInternalIP(node.GetStatus())) + metricAttributes.PutStr(nodeMetricHostName, getNodeHostName(node.GetStatus())) + metricAttributes.PutInt(nodeMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func getNodeInternalIP(status *processv1.NodeStatus) string { + if status == nil { + return "" + } + addresses := status.GetNodeAddresses() + if addresses == nil { + return "" + } + return addresses["InternalIP"] +} + +func getNodeHostName(status *processv1.NodeStatus) string { + if status == nil { + return "" + } + addresses := status.GetNodeAddresses() + if addresses == nil { + return "" + } + return addresses["Hostname"] +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(nodeAttrClusterID, clusterID) + metricAttributes.PutStr(nodeAttrClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/persistentvolume/persistentvolume.go b/receiver/datadogmetricreceiver/persistentvolume/persistentvolume.go new file mode 100644 index 000000000000..243a937f652a --- /dev/null +++ b/receiver/datadogmetricreceiver/persistentvolume/persistentvolume.go @@ -0,0 +1,128 @@ +package persistentvolume + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Common constants +const ( + scopeName = "mw" + scopeVersion = "v0.0.1" + pvMetricCapacity = "ddk8s.persistentvolume.capacity" + + // Attributes + pvAttrUID = "ddk8s.persistentvolume.uid" + pvAttrNamespace = "ddk8s.persistentvolume.namespace" + pvAttrName = "ddk8s.persistentvolume.name" + pvAttrLabels = "ddk8s.persistentvolume.labels" + pvAttrAnnotations = "ddk8s.persistentvolume.annotations" + pvAttrFinalizers = "ddk8s.persistentvolume.finalizers" + pvAttrType = "ddk8s.persistentvolume.type" + pvAttrPhase = "ddk8s.persistentvolume.phase" + pvAttrStorageClass = "ddk8s.persistentvolume.storage_class" + pvAttrVolumeMode = "ddk8s.persistentvolume.volume_mode" + pvAttrAccessMode = "ddk8s.persistentvolume.access_mode" + pvAttrClaimPolicy = "ddk8s.persistentvolume.claim_policy" + pvAttrCreateTime = "ddk8s.persistentvolume.create_time" + pvAttrClusterID = "ddk8s.persistentvolume.cluster.id" + pvAttrClusterName = "ddk8s.persistentvolume.cluster.name" +) + +// GetOtlpExportReqFromDatadogPVData converts Datadog persistent volume data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogPVData(origin string, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorPersistentVolume) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload("No metrics related to PersistentVolumes found in Payload") + } + + pvs := ddReq.GetPersistentVolumes() + + if len(pvs) == 0 { + log.Println("no pvs found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload("No metrics related to PersistentVolumes found in Payload") + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, pv := range pvs { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(resourceAttributes, clusterName, clusterID) + appendPVMetrics(&scopeMetrics, resourceAttributes, pv, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendPVMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, pv *processv1.PersistentVolume, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(pvMetricCapacity) + + metricAttributes := pcommon.NewMap() + var volumeCapacity int64 + + metadata := pv.GetMetadata() + if metadata != nil { + resourceAttributes.PutStr(pvAttrUID, metadata.GetUid()) + metricAttributes.PutStr(pvAttrNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(pvAttrName, metadata.GetName()) + metricAttributes.PutStr(pvAttrLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(pvAttrAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(pvAttrFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutStr(pvAttrType, "PersistentVolume") + + phaseDetails := pv.GetStatus() + if phaseDetails != nil { + metricAttributes.PutStr(pvAttrPhase, phaseDetails.GetPhase()) + } + + pvcSpec := pv.GetSpec() + if pvcSpec != nil { + if capacityMap := pvcSpec.GetCapacity(); capacityMap != nil { + volumeCapacity = capacityMap["storage"] + } + metricAttributes.PutStr(pvAttrStorageClass, pvcSpec.GetStorageClassName()) + metricAttributes.PutStr(pvAttrVolumeMode, pvcSpec.GetVolumeMode()) + if accessModes := pvcSpec.GetAccessModes(); accessModes != nil { + metricAttributes.PutStr(pvAttrAccessMode, strings.Join(accessModes, ",")) + } + metricAttributes.PutStr(pvAttrClaimPolicy, pvcSpec.GetPersistentVolumeReclaimPolicy()) + } + metricAttributes.PutInt(pvAttrCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(volumeCapacity) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(resourceAttributes pcommon.Map, clusterName string, clusterID string) { + resourceAttributes.PutStr(pvAttrClusterID, clusterID) + resourceAttributes.PutStr(pvAttrClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/persistentvolumeclaim/persistentvolumeclaim.go b/receiver/datadogmetricreceiver/persistentvolumeclaim/persistentvolumeclaim.go new file mode 100644 index 000000000000..683230c9c4f9 --- /dev/null +++ b/receiver/datadogmetricreceiver/persistentvolumeclaim/persistentvolumeclaim.go @@ -0,0 +1,126 @@ +package persistentvolumeclaim + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +const ( + // Errors + pvcPayloadErrorMessage = "No metrics related to PersistentVolumeClaims found in Payload" + // Metrics + pvcMetricCapacity = "ddk8s.persistentvolumeclaim.capacity" + // Attributes + pvcMetricUID = "ddk8s.persistentvolumeclaim.uid" + pvcMetricNamespace = "ddk8s.persistentvolumeclaim.namespace" + pvcMetricClusterID = "ddk8s.persistentvolumeclaim.cluster.id" + pvcMetricClusterName = "ddk8s.persistentvolumeclaim.cluster.name" + pvcMetricName = "ddk8s.persistentvolumeclaim.name" + pvcMetricPhase = "ddk8s.persistentvolumeclaim.phase" + pvcMetricAccessModes = "ddk8s.persistentvolumeclaim.access_mode" + pvcMetricPvName = "ddk8s.persistentvolumeclaim.pv_name" + pvcMetricStorageClass = "ddk8s.persistentvolumeclaim.storage_class" + pvcMetricVolumeMode = "ddk8s.persistentvolumeclaim.volume_mode" + pvcMetricCreateTime = "ddk8s.persistentvolumeclaim.create_time" + pvcMetricLabels = "ddk8s.persistentvolumeclaim.labels" + pvcMetricAnnotations = "ddk8s.persistentvolumeclaim.annotations" + pvcMetricFinalizers = "ddk8s.persistentvolumeclaim.finalizers" + pvcMetricType = "ddk8s.persistentvolumeclaim.type" +) + +// GetOtlpExportReqFromDatadogPVCData converts Datadog persistent volume claim data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogPVCData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorPersistentVolumeClaim) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(pvcPayloadErrorMessage) + } + + pvcs := ddReq.GetPersistentVolumeClaims() + + if len(pvcs) == 0 { + log.Println("no pvcs found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(pvcPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, pvc := range pvcs { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendPVCMetrics(&scopeMetrics, resourceAttributes, metricAttributes, pvc, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendPVCMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, pvc *processv1.PersistentVolumeClaim, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(pvcMetricCapacity) + + var volumeCapacity int64 + + metadata := pvc.GetMetadata() + + if metadata != nil { + resourceAttributes.PutStr(pvcMetricUID, metadata.GetUid()) + metricAttributes.PutStr(pvcMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(pvcMetricName, metadata.GetName()) + metricAttributes.PutStr(pvcMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(pvcMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(pvcMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutStr(pvcMetricType, "PersistentVolumeClaim") + phaseDetails := pvc.GetStatus() + if phaseDetails != nil { + metricAttributes.PutStr(pvcMetricPhase, phaseDetails.GetPhase()) + if accessModes := phaseDetails.GetAccessModes(); accessModes != nil { + metricAttributes.PutStr(pvcMetricAccessModes, strings.Join(accessModes, ",")) + } + capacityMap := phaseDetails.GetCapacity() + volumeCapacity = capacityMap["storage"] + } + + pvcSpec := pvc.GetSpec() + if pvcSpec != nil { + metricAttributes.PutStr(pvcMetricPvName, pvcSpec.GetVolumeName()) + metricAttributes.PutStr(pvcMetricStorageClass, pvcSpec.GetStorageClassName()) + metricAttributes.PutStr(pvcMetricVolumeMode, pvcSpec.GetVolumeMode()) + } + + metricAttributes.PutInt(pvcMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(volumeCapacity) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(pvcMetricClusterID, clusterID) + metricAttributes.PutStr(pvcMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/pod/pod.go b/receiver/datadogmetricreceiver/pod/pod.go new file mode 100644 index 000000000000..bde87f244a12 --- /dev/null +++ b/receiver/datadogmetricreceiver/pod/pod.go @@ -0,0 +1,104 @@ +package pod + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for pods +const ( + // Errors + podPayloadErrorMessage = "No metrics related to Pods found in Payload" + // Metrics + podMetricRestartCount = "ddk8s.pod.restart_count" + // Attributes + podName = "ddk8s.pod.name" + podMetricNamespace = "ddk8s.namespace.name" + podAttrClusterID = "ddk8s.cluster.id" + podAttrClusterName = "ddk8s.cluster.name" + podAttrKubeClusterName = "kube_cluster_name" + podMetricIP = "ddk8s.pod.ip" + podMetricQOS = "ddk8s.pod.qos" + podMetricLabels = "ddk8s.pod.labels" + podMetricAnnotations = "ddk8s.pod.annotations" + podMetricFinalizers = "ddk8s.pod.finalizers" + podMetricCreateTime = "ddk8s.pod.create_time" +) + +// getOtlpExportReqFromPodData converts Datadog pod data into OTLP ExportRequest. +func GetOtlpExportReqFromPodData(origin string, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorPod) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(podPayloadErrorMessage) + } + pods := ddReq.GetPods() + + if len(pods) == 0 { + log.Println("no pods found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(podPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, pod := range pods { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendPodMetrics(&scopeMetrics, resourceAttributes, metricAttributes, pod, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendPodMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, pod *processv1.Pod, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(podMetricRestartCount) + metadata := pod.GetMetadata() + + if metadata != nil { + resourceAttributes.PutStr(podName, metadata.GetName()) + metricAttributes.PutStr(podMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(podMetricIP, pod.GetIP()) + metricAttributes.PutStr(podMetricQOS, pod.GetQOSClass()) + metricAttributes.PutStr(podMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(podMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(podMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + + // Calculate pod creation time + metricAttributes.PutInt(podMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(int64(pod.GetRestartCount())) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(podAttrClusterID, clusterID) + metricAttributes.PutStr(podAttrClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/receiver.go b/receiver/datadogmetricreceiver/receiver.go index 5582690ad746..7842a686c01f 100644 --- a/receiver/datadogmetricreceiver/receiver.go +++ b/receiver/datadogmetricreceiver/receiver.go @@ -24,6 +24,27 @@ import ( metricsV2 "github.com/DataDog/agent-payload/v5/gogen" processv1 "github.com/DataDog/agent-payload/v5/process" metricsV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/cluster" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/clusterrolebinding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/clusterroles" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/cronjob" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/daemonset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/deployment" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/hpa" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/ingress" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/job" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/namespace" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/node" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/persistentvolume" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/persistentvolumeclaim" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/pod" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/replicaset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/rolebinding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/roles" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/service" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/serviceaccount" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/statefulset" ) const ( @@ -156,6 +177,18 @@ func (ddr *datadogmetricreceiver) Start(_ context.Context, host component.Host) ddmux.HandleFunc("/api/v1/collector", ddr.handleCollector) ddmux.HandleFunc("/api/v1/check_run", ddr.handleCheckRun) ddmux.HandleFunc("/api/v1/connections", ddr.handleConnections) + ddmux.HandleFunc("/api/v2/orch", ddr.handleOrchestrator) + // Not Implemented Handlers + ddmux.HandleFunc("/api/v1/sketches", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v2/host_metadata", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v2/events", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v2/service_checks", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/beta/sketches", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v1/discovery", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v2/proclcycle", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v1/container", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v1/orchestrator", ddr.handleNotImplemenetedAPI) + ddmux.HandleFunc("/api/v2/orchmanif", ddr.handleNotImplemenetedAPI) var err error ddr.server, err = ddr.config.HTTPServerSettings.ToServer( @@ -256,7 +289,7 @@ func (ddr *datadogmetricreceiver) handleV2Series(w http.ResponseWriter, req *htt http.Error(w, "error in unmarshalling req payload", http.StatusBadRequest) return } - otlpReq, err = getOtlpExportReqFromDatadogV2Metrics(origin, key, v2Metrics) + otlpReq, err = GetOtlpExportReqFromDatadogV2Metrics(origin, key, v2Metrics) } if err != nil { @@ -408,6 +441,7 @@ func (ddr *datadogmetricreceiver) handleCollector(w http.ResponseWriter, req *ht http.Error(w, "error in getOtlpExportReqFromDatadogProcessesData", http.StatusBadRequest) return } + obsCtx := ddr.tReceiver.StartLogsOp(req.Context()) errs := ddr.nextConsumer.ConsumeMetrics(obsCtx, otlpReq.Metrics()) if errs != nil { @@ -417,3 +451,94 @@ func (ddr *datadogmetricreceiver) handleCollector(w http.ResponseWriter, req *ht _, _ = w.Write([]byte("OK")) } } + +func (ddr *datadogmetricreceiver) handleOrchestrator(w http.ResponseWriter, req *http.Request) { + origin := req.Header.Get("Origin") + key := req.Header.Get(datadogAPIKeyHeader) + body, ok := readAndCloseBody(w, req) + if !ok { + http.Error(w, "error in reading request body", http.StatusBadRequest) + return + } + var err error + + reqBody, err := processv1.DecodeMessage(body) + if err != nil { + http.Error(w, "error in decoding request body", http.StatusBadRequest) + return + } + + timestamp := reqBody.Header.Timestamp + resourceType := reqBody.Header.Type + + if timestamp == 0 { + timestamp = helpers.GetMillis() + } + + var otlpReq pmetricotlp.ExportRequest + + switch resourceType { + case processv1.TypeCollectorRoleBinding: + otlpReq, err = rolebinding.GetOtlpExportReqFromDatadogRoleBindingData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorClusterRoleBinding: + otlpReq, err = clusterrolebinding.GetOtlpExportReqFromDatadogClusterRoleBindingData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorRole: + otlpReq, err = roles.GetOtlpExportReqFromDatadogRolesData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorClusterRole: + otlpReq, err = clusterroles.GetOtlpExportReqFromDatadogClusterRolesData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorCluster: + otlpReq, err = cluster.GetOtlpExportReqFromClusterData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorNamespace: + otlpReq, err = namespace.GetOtlpExportReqFromNamespaceData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorServiceAccount: + otlpReq, err = serviceaccount.GetOtlpExportReqFromDatadogServiceAccountData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorPersistentVolumeClaim: + otlpReq, err = persistentvolumeclaim.GetOtlpExportReqFromDatadogPVCData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorPersistentVolume: + otlpReq, err = persistentvolume.GetOtlpExportReqFromDatadogPVData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorHorizontalPodAutoscaler: + otlpReq, err = hpa.GetOtlpExportReqFromDatadogHPAData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorIngress: + otlpReq, err = ingress.GetOtlpExportReqFromDatadogIngressData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorJob: + otlpReq, err = job.GetOtlpExportReqFromDatadogJobData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorCronJob: + otlpReq, err = cronjob.GetOtlpExportReqFromDatadogCronJobData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorReplicaSet: + otlpReq, err = replicaset.GetOtlpExportReqFromDatadogReplicaSetData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorStatefulSet: + otlpReq, err = statefulset.GetOtlpExportReqFromDatadogStatefulSetData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorService: + otlpReq, err = service.GetOtlpExportReqFromDatadogServiceData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorDaemonSet: + otlpReq, err = daemonset.GetOtlpExportReqFromDatadogDaemonSetData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorDeployment: + otlpReq, err = deployment.GetOtlpExportReqFromDatadogDeploymentData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorNode: + otlpReq, err = node.GetOtlpExportReqFromDatadogNodeData(origin, key, reqBody.Body, timestamp) + case processv1.TypeCollectorPod: + otlpReq, err = pod.GetOtlpExportReqFromPodData(origin, key, reqBody.Body, timestamp) + default: + http.Error(w, "unsupported message type", http.StatusBadRequest) + return + } + + if err != nil { + http.Error(w, "error in getOtlpExportReqFromDatadogProcessesData", http.StatusBadRequest) + return + } + + obsCtx := ddr.tReceiver.StartLogsOp(req.Context()) + errs := ddr.nextConsumer.ConsumeMetrics(obsCtx, otlpReq.Metrics()) + if errs != nil { + http.Error(w, "Logs consumer errored out", http.StatusInternalServerError) + ddr.params.Logger.Error("Logs consumer errored out") + } else { + _, _ = w.Write([]byte("OK")) + } +} + +func (ddr *datadogmetricreceiver) handleNotImplemenetedAPI(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, `{"valid":true}`) +} diff --git a/receiver/datadogmetricreceiver/replicaset/replicaset.go b/receiver/datadogmetricreceiver/replicaset/replicaset.go new file mode 100644 index 000000000000..d59a60e4cfd1 --- /dev/null +++ b/receiver/datadogmetricreceiver/replicaset/replicaset.go @@ -0,0 +1,113 @@ +package replicaset + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for replica sets +const ( + // Errors + replicaSetPayloadErrorMessage = "No metrics related to ReplicaSets found in Payload" + // Metrics + replicaSetMetricAvailable = "ddk8s.replicaset.available" + replicaSetMetricDesired = "ddk8s.replicaset.desired" + replicaSetMetricReady = "ddk8s.replicaset.ready" + // Attributes + replicaSetMetricUID = "ddk8s.replicaset.uid" + replicaSetMetricName = "ddk8s.replicaset.name" + replicaSetMetricLabels = "ddk8s.replicaset.labels" + replicaSetMetricAnnotations = "ddk8s.replicaset.annotations" + replicaSetMetricFinalizers = "ddk8s.replicaset.finalizers" + replicaSetMetricCreateTime = "ddk8s.replicaset.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" +) + +// GetOtlpExportReqFromDatadogReplicaSetData converts Datadog replica set data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogReplicaSetData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorReplicaSet) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(replicaSetPayloadErrorMessage) + } + replicasets := ddReq.GetReplicaSets() + + if len(replicasets) == 0 { + log.Println("no replicasets found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(replicaSetPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range []string{replicaSetMetricAvailable, replicaSetMetricDesired, replicaSetMetricReady} { + for _, replicaset := range replicasets { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendReplicaSetMetrics(&scopeMetrics, resourceAttributes, metricAttributes, replicaset, metricName, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendReplicaSetMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, replicaset *processv1.ReplicaSet, metricName string, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + if metadata := replicaset.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(replicaSetMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(replicaSetMetricName, metadata.GetName()) + metricAttributes.PutStr(replicaSetMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(replicaSetMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(replicaSetMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(replicaSetMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + switch metricName { + case replicaSetMetricAvailable: + metricVal = int64(replicaset.GetAvailableReplicas()) + case replicaSetMetricDesired: + metricVal = int64(replicaset.GetReplicasDesired()) + case replicaSetMetricReady: + metricVal = int64(replicaset.GetReadyReplicas()) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +} diff --git a/receiver/datadogmetricreceiver/rolebinding/rolebinding.go b/receiver/datadogmetricreceiver/rolebinding/rolebinding.go new file mode 100644 index 000000000000..334d8705b2b5 --- /dev/null +++ b/receiver/datadogmetricreceiver/rolebinding/rolebinding.go @@ -0,0 +1,137 @@ +package rolebinding + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for role bindings +const ( + // Errors + roleBindingsPayloadErrorMessage = "No metrics related to RoleBindings found in Payload" + // Metrics + roleBindingsMetricSubjectCount = "ddk8s.rolebindings.subject.count" + // Attributes + roleBindingsMetricUID = "ddk8s.rolebindings.uid" + roleBindingsMetricNamespace = "ddk8s.rolebindings.namespace" + roleBindingsAttrClusterID = "ddk8s.rolebindings.cluster.id" + roleBindingsAttrClusterName = "ddk8s.rolebindings.cluster.name" + roleBindingsMetricName = "ddk8s.rolebindings.name" + roleBindingsMetricCreateTime = "ddk8s.rolebindings.create_time" + roleBindingsMetricSubjects = "ddk8s.rolebindings.subjects" + roleBindingsMetricRoleRef = "ddk8s.rolebindings.roleref" + roleBindingsMetricType = "ddk8s.rolebindings.type" + roleBindingsMetricLabels = "ddk8s.rolebindings.labels" + roleBindingsMetricAnnotations = "ddk8s.rolebindings.annotations" +) + +// GetOtlpExportReqFromDatadogRoleBindingData converts Datadog role binding data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogRoleBindingData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + + ddReq, ok := Body.(*processv1.CollectorRoleBinding) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(roleBindingsPayloadErrorMessage) + } + + roleBindings := ddReq.GetRoleBindings() + + if len(roleBindings) == 0 { + log.Println("no role bindings found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(roleBindingsPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, binding := range roleBindings { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendRoleBindingMetrics(&scopeMetrics, resourceAttributes, metricAttributes, binding, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendRoleBindingMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, binding *processv1.RoleBinding, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(roleBindingsMetricSubjectCount) + + var metricVal int64 + + if metadata := binding.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(roleBindingsMetricUID, metadata.GetUid()) + metricAttributes.PutStr(roleBindingsMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(roleBindingsMetricName, metadata.GetName()) + metricAttributes.PutStr(roleBindingsMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(roleBindingsMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(roleBindingsMetricRoleRef, getRoleRefString(binding.GetRoleRef())) + metricAttributes.PutInt(roleBindingsMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + if subjects := binding.GetSubjects(); subjects != nil { + metricAttributes.PutStr(roleBindingsMetricSubjects, convertSubjectsToString(subjects)) + metricVal = int64(len(subjects)) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(roleBindingsAttrClusterID, clusterID) + metricAttributes.PutStr(roleBindingsAttrClusterName, clusterName) +} + +func convertSubjectsToString(subjects []*processv1.Subject) string { + var result strings.Builder + + for i, subject := range subjects { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("kind=") + result.WriteString(subject.GetKind()) + + result.WriteString("&name=") + result.WriteString(subject.GetName()) + + result.WriteString("&namespace=") + result.WriteString(subject.GetNamespace()) + } + + return result.String() +} + +func getRoleRefString(ref *processv1.TypedLocalObjectReference) string { + if ref == nil { + return "" + } + return "apiGroup=" + ref.GetApiGroup() + "&kind=" + ref.GetKind() + "&name=" + ref.GetName() +} diff --git a/receiver/datadogmetricreceiver/roles/roles.go b/receiver/datadogmetricreceiver/roles/roles.go new file mode 100644 index 000000000000..eabadbe56581 --- /dev/null +++ b/receiver/datadogmetricreceiver/roles/roles.go @@ -0,0 +1,313 @@ +package roles + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "log" + "strings" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +const ( + RolePayloadErrorMessage = "No metrics related to Roles found in Payload" + // Metric names + RoleMetricRuleCount = "ddk8s.role.count" + // Attribute keys + RoleMetricUID = "ddk8s.role.uid" + RoleMetricNamespace = "ddk8s.role.namespace" + attrClusterID = "ddk8s.role.cluster.id" + attrClusterName = "ddk8s.role.cluster.name" + RoleMetricName = "ddk8s.role.name" + RoleMetricCreateTime = "ddk8s.role.create.time" + RoleMetricLabels = "ddk8s.role.labels" + RoleMetricAnnotations = "ddk8s.role.annotations" + RoleMetricType = "ddk8s.role.type" + RoleMetricRules = "ddk8s.role.rules" +) + +func GetOtlpExportReqFromDatadogRolesData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + + ddReq, ok := Body.(*processv1.CollectorRole) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(RolePayloadErrorMessage) + } + + roles := ddReq.GetRoles() + + if len(roles) == 0 { + log.Println("no roles found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(RolePayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + cluster_name := ddReq.GetClusterName() + cluster_id := ddReq.GetClusterId() + + for _, role := range roles { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, cluster_name, cluster_id) + appendMetrics(&scopeMetrics, resourceAttributes, metricAttributes, role, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, role *processv1.Role, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(RoleMetricRuleCount) + + var metricVal int64 + + if metadata := role.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(RoleMetricUID, metadata.GetUid()) + metricAttributes.PutStr(RoleMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(RoleMetricName, metadata.GetName()) + metricAttributes.PutStr(RoleMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(RoleMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(RoleMetricAnnotations, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(RoleMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + metricAttributes.PutStr(RoleMetricType, "Roles") + + if rules := role.GetRules(); rules != nil { + metricAttributes.PutStr(RoleMetricRules, convertRulesToString(rules)) + metricVal = int64(len(rules)) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + + +func setHostK8sAttributes(metricAttributes pcommon.Map, cluster_name string, cluster_id string) { + metricAttributes.PutStr(attrClusterID, cluster_id) + metricAttributes.PutStr(attrClusterName, cluster_name) +} + +func convertRulesToString(rules []*processv1.PolicyRule) string { + var result strings.Builder + + for i, rule := range rules { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("verbs=") + result.WriteString(strings.Join(rule.GetVerbs(), ",")) + + result.WriteString("&apiGroups=") + result.WriteString(strings.Join(rule.GetApiGroups(), ",")) + + result.WriteString("&resources=") + result.WriteString(strings.Join(rule.GetResources(), ",")) + + result.WriteString("&resourceNames=") + result.WriteString(strings.Join(rule.GetResourceNames(), ",")) + + result.WriteString("&nonResourceURLs=") + result.WriteString(strings.Join(rule.GetNonResourceURLs(), ",")) + + } + + return result.String() +} + +// func getOtlpExportReqFromDatadogRolesData(origin string, key string, ddReq *processv1.CollectorRole) (pmetricotlp.ExportRequest, error) { +// // assumption is that host is same for all the metrics in a given request + +// roles := ddReq.GetRoles() + +// if len(roles) == 0 { +// log.Println("no roles found so skipping") +// return pmetricotlp.ExportRequest{}, ErrNoMetricsInPayload +// } + +// metrics := pmetric.NewMetrics() +// resourceMetrics := metrics.ResourceMetrics() + +// for _, role := range roles { +// rm := resourceMetrics.AppendEmpty() +// resourceAttributes := rm.Resource().Attributes() + +// cluster_name := ddReq.GetClusterName() +// cluster_id := ddReq.GetClusterId() + +// commonResourceAttributes := commonResourceAttributes{ +// origin: origin, +// ApiKey: key, +// mwSource: "datadog", +// //host: "trial", +// } +// setMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + +// scopeMetrics := rm.ScopeMetrics().AppendEmpty() +// instrumentationScope := scopeMetrics.Scope() +// instrumentationScope.SetName("mw") +// instrumentationScope.SetVersion("v0.0.1") +// scopeMetric := scopeMetrics.Metrics().AppendEmpty() +// //scopeMetric.SetName("kubernetes_state.role.count") +// scopeMetric.SetName("ddk8s.role.count") +// //scopeMetric.SetUnit(s.GetUnit()) + +// metricAttributes := pcommon.NewMap() + +// metadata := role.GetMetadata() +// resourceAttributes.PutStr("ddk8s.role.uid", metadata.GetUid()) +// metricAttributes.PutStr("ddk8s.role.namespace", metadata.GetNamespace()) +// metricAttributes.PutStr("ddk8s.role.cluster.id", cluster_id) +// metricAttributes.PutStr("ddk8s.role.cluster.name", cluster_name) +// metricAttributes.PutStr("ddk8s.role.name", metadata.GetName()) + +// currentTime := time.Now() +// milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 +// // fmt.Println("milliseconds",milliseconds) +// // fmt.Println("creation",metadata.GetCreationTimestamp() * 1000) +// createtime := (int64(milli//"time" +// //fmt.Println("diff is " , diff) + +// for _, tag := range role.GetTags() { + +// parts := strings.Split(tag, ":") +// if len(parts) != 2 { +// continue +// } + +// metricAttributes.PutStr(parts[0], parts[1]) +// } + +// rules := role.GetRules() +// metricAttributes.PutStr("ddk8s.role.rules", convertRulesToString(rules)) +// metricAttributes.PutStr("ddk8s.role.type", "Roles") +// metricAttributes.PutStr("ddk8s.role.labels", strings.Join(metadata.GetLabels(), "&")) +// metricAttributes.PutStr("ddk8s.role.annotations", strings.Join(metadata.GetAnnotations(), "&")) +// // current time in millis +// //fmt.Println("string is", convertRulesToString(rules)) +// var dataPoints pmetric.NumberDataPointSlice +// gauge := scopeMetric.SetEmptyGauge() +// dataPoints = gauge.DataPoints() + +// dp := dataPoints.AppendEmpty() +// dp.SetTimestamp(pcommon.Timestamp(milliseconds)) + +// dp.SetIntValue(int64(len(rules))) // setting a dummy value for this metric as only resource attribute needed +// attributeMap := dp.Attributes() +// metricAttributes.CopyTo(attributeMap) +// } + +// // metrics := pmetric.NewMetrics() +// // resourceMetrics := metrics.ResourceMetrics() +// // rm := resourceMetrics.AppendEmpty() +// // resourceAttributes := rm.Resource().Attributes() + +// // // assumption is that host is same for all the metrics in a given request +// // // var metricHost string +// // // metricHost = input.hostname + +// // // var metricHost string + +// // // for _, tag := range ddReq.GetTags() { + +// // // parts := strings.Split(tag, ":") +// // // if len(parts) != 2 { +// // // continue +// // // } + +// // // resourceAttributes.PutStr(parts[0], parts[1]) +// // // } + +// // // resourceAttributes.PutStr("kube_cluster_name", ddReq.GetClusterName()) +// // // resourceAttributes.PutStr("kube_cluster_id", ddReq.GetClusterId()) + +// // cluster_name := ddReq.GetClusterName() +// // cluster_id := ddReq.GetClusterId() + +// // commonResourceAttributes := commonResourceAttributes{ +// // origin: origin, +// // ApiKey: key, +// // mwSource: "datadog", +// // //host: "trial", +// // } +// // setMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + +// // scopeMetrics := rm.ScopeMetrics().AppendEmpty() +// // instrumentationScope := scopeMetrics.Scope() +// // instrumentationScope.SetName("mw") +// // instrumentationScope.SetVersion("v0.0.1") + +// // for _, role := range roles { + +// // scopeMetric := scopeMetrics.Metrics().AppendEmpty() +// // //scopeMetric.SetName("kubernetes_state.role.count") +// // scopeMetric.SetName("ddk8s.role.count") +// // //scopeMetric.SetUnit(s.GetUnit()) + +// // metricAttributes := pcommon.NewMap() + +// // metadata := role.GetMetadata() +// // metricAttributes.PutStr("ddk8s.role.namespace", metadata.GetNamespace()) +// // metricAttributes.PutStr("ddk8s.role.cluster.id", cluster_id) +// // metricAttributes.PutStr("ddk8s.role.cluster.name", cluster_name) +// // metricAttributes.PutStr("ddk8s.role.uid", metadata.GetUid()) +// // metricAttributes.PutStr("ddk8s.role.name", metadata.GetName()) + +// // currentTime := time.Now() +// // milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 +// // // fmt.Println("milliseconds",milliseconds) +// // // fmt.Println("creation",metadata.GetCreationTimestamp() * 1000) +// // createtime := (int64(milliseconds/1000000000) - metadata.GetCreationTimestamp()) +// // //diff := milliseconds - (metadata.GetCreationTimestamp() * 1000) +// // fmt.Println("diff is ",createtime) +// // metricAttributes.PutInt("ddk8s.role.create.time", createtime) + +// // //fmt.Println("diff is " , diff) + +// // for _, tag := range role.GetTags() { + +// // parts := strings.Split(tag, ":") +// // if len(parts) != 2 { +// // continue +// // } + +// // metricAttributes.PutStr(parts[0], parts[1]) +// // } + +// // rules := role.GetRules() +// // metricAttributes.PutStr("ddk8s.role.rules", convertRulesToString(rules)) +// // // current time in millis +// // //fmt.Println("string is", convertRulesToString(rules)) +// // var dataPoints pmetric.NumberDataPointSlice +// // gauge := scopeMetric.SetEmptyGauge() +// // dataPoints = gauge.DataPoints() + +// // dp := dataPoints.AppendEmpty() +// // dp.SetTimestamp(pcommon.Timestamp(milliseconds)) + +// // dp.SetIntValue(int64(len(rules))) // setting a dummy value for this metric as only resource attribute needed +// // attributeMap := dp.Attributes() +// // metricAttributes.CopyTo(attributeMap) +// // } + +// return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +// } diff --git a/receiver/datadogmetricreceiver/service/service.go b/receiver/datadogmetricreceiver/service/service.go new file mode 100644 index 000000000000..d442656e979e --- /dev/null +++ b/receiver/datadogmetricreceiver/service/service.go @@ -0,0 +1,130 @@ +package service + +import ( + "fmt" + "log" + "strings" + "time" + + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +// Constants for Service metrics +const ( + // Metrics + serviceMetricPortCount = "ddk8s.service.port_count" + // Attributes + serviceMetricUID = "ddk8s.service.uid" + serviceMetricNamespace = "ddk8s.service.namespace" + serviceMetricClusterID = "ddk8s.service.cluster.id" + serviceMetricClusterName = "ddk8s.cluster.name" + serviceMetricName = "ddk8s.service.name" + serviceMetricLabels = "ddk8s.service.labels" + serviceMetricAnnotations = "ddk8s.service.annotations" + serviceMetricFinalizers = "ddk8s.service.finalizers" + serviceMetricType = "ddk8s.service.type" + serviceMetricClusterIP = "ddk8s.service.cluster_ip" + serviceMetricPortsList = "ddk8s.service.ports_list" + serviceMetricCreateTime = "ddk8s.service.create_time" + // Error + ErrNoMetricsInPayload = "No metrics related to Services found in Payload" +) + +// GetOtlpExportReqFromDatadogServiceData converts Datadog Service data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogServiceData(origin string, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorService) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(ErrNoMetricsInPayload) + } + services := ddReq.GetServices() + + if len(services) == 0 { + log.Println("no services found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(ErrNoMetricsInPayload) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, service := range services { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendServiceMetrics(&scopeMetrics, resourceAttributes, metricAttributes, service) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendServiceMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, service *processv1.Service) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(serviceMetricPortCount) + + var metricVal int64 + + metadata := service.GetMetadata() + if metadata != nil { + resourceAttributes.PutStr(serviceMetricUID, metadata.GetUid()) + metricAttributes.PutStr(serviceMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(serviceMetricName, metadata.GetName()) + metricAttributes.PutStr(serviceMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(serviceMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(serviceMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + } + + specDetails := service.GetSpec() + metricVal = int64(len(specDetails.GetPorts())) + metricAttributes.PutStr(serviceMetricType, specDetails.GetType()) + metricAttributes.PutStr(serviceMetricClusterIP, specDetails.GetClusterIP()) + metricAttributes.PutStr(serviceMetricPortsList, convertPortRulesToString(specDetails.GetPorts())) + + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + createTime := (milliseconds / 1000000000) - metadata.GetCreationTimestamp() + metricAttributes.PutInt(serviceMetricCreateTime, createTime) + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(milliseconds)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(serviceMetricClusterID, clusterID) + metricAttributes.PutStr(serviceMetricClusterName, clusterName) +} + +func convertPortRulesToString(serviceports []*processv1.ServicePort) string { + var result strings.Builder + + for i, sp := range serviceports { + if i > 0 { + result.WriteString("&") + } + portString := fmt.Sprintf("%s %d/%s", sp.GetName(), sp.GetPort(), sp.GetProtocol()) + result.WriteString(portString) + } + + return result.String() +} diff --git a/receiver/datadogmetricreceiver/serviceaccount/serviceaccount.go b/receiver/datadogmetricreceiver/serviceaccount/serviceaccount.go new file mode 100644 index 000000000000..c95af39cb7b9 --- /dev/null +++ b/receiver/datadogmetricreceiver/serviceaccount/serviceaccount.go @@ -0,0 +1,131 @@ +package serviceaccount + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for service accounts +const ( + // Errors + serviceAccountPayloadErrorMessage = "No metrics related to ServiceAccounts found in Payload" + // Metrics + serviceAccountMetricSecretCount = "ddk8s.serviceaccount.secret.count" + // Attributes + serviceAccountMetricUID = "ddk8s.serviceaccount.uid" + serviceAccountMetricNamespace = "ddk8s.serviceaccount.namespace" + serviceAccountAttrClusterID = "ddk8s.serviceaccount.cluster.id" + serviceAccountAttrClusterName = "ddk8s.serviceaccount.cluster.name" + serviceAccountMetricName = "ddk8s.serviceaccount.name" + serviceAccountMetricCreateTime = "ddk8s.serviceaccount.create_time" + serviceAccountMetricSecrets = "ddk8s.serviceaccount.secrets" + serviceAccountMetricLabels = "ddk8s.serviceaccount.labels" + serviceAccountMetricAnnotations = "ddk8s.serviceaccount.annotations" + serviceAccountMetricType = "ddk8s.serviceaccount.type" + serviceAccountMetricAutomountServiceAccountToken = "ddk8s.serviceaccount.automount_serviceaccount_token" +) + +// GetOtlpExportReqFromDatadogServiceAccountData converts Datadog service account data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogServiceAccountData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorServiceAccount) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(serviceAccountPayloadErrorMessage) + } + + serviceAccounts := ddReq.GetServiceAccounts() + + if len(serviceAccounts) == 0 { + log.Println("no service accounts found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(serviceAccountPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, account := range serviceAccounts { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendServiceAccountMetrics(&scopeMetrics, resourceAttributes, metricAttributes, account, timestamp) + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendServiceAccountMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, account *processv1.ServiceAccount, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(serviceAccountMetricSecretCount) + + var metricVal int64 + + if metadata := account.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(serviceAccountMetricUID, metadata.GetUid()) + metricAttributes.PutStr(serviceAccountMetricNamespace, metadata.GetNamespace()) + metricAttributes.PutStr(serviceAccountMetricName, metadata.GetName()) + metricAttributes.PutStr(serviceAccountMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(serviceAccountMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(serviceAccountMetricType, "ServiceAccount") + metricAttributes.PutBool(serviceAccountMetricAutomountServiceAccountToken, account.GetAutomountServiceAccountToken()) + metricAttributes.PutInt(serviceAccountMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + + if secrets := account.GetSecrets(); secrets != nil { + metricAttributes.PutStr(serviceAccountMetricSecrets, convertSecretsToString(secrets)) + metricVal = int64(len(secrets)) + } + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(serviceAccountAttrClusterID, clusterID) + metricAttributes.PutStr(serviceAccountAttrClusterName, clusterName) +} + +func convertSecretsToString(secrets []*processv1.ObjectReference) string { + var result strings.Builder + + for i, secret := range secrets { + if i > 0 { + result.WriteString(";") + } + + result.WriteString("kind=") + result.WriteString(secret.GetKind()) + + result.WriteString("&name=") + result.WriteString(secret.GetName()) + + result.WriteString("&namespace=") + result.WriteString(secret.GetNamespace()) + + } + + return result.String() +} diff --git a/receiver/datadogmetricreceiver/statefulset/statefulset.go b/receiver/datadogmetricreceiver/statefulset/statefulset.go new file mode 100644 index 000000000000..b28cbad7e680 --- /dev/null +++ b/receiver/datadogmetricreceiver/statefulset/statefulset.go @@ -0,0 +1,126 @@ +package statefulset + +import ( + processv1 "github.com/DataDog/agent-payload/v5/process" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogmetricreceiver/helpers" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "log" + "strings" +) + +// Private constants for statefulsets +const ( + // Errors + statefulSetPayloadErrorMessage = "No metrics related to StatefulSets found in Payload" + // Metrics + statefulSetMetricAvailable = "ddk8s.statefulset.available" + statefulSetMetricDesired = "ddk8s.statefulset.desired" + statefulSetMetricReady = "ddk8s.statefulset.ready" + statefulSetMetricUpdated = "ddk8s.statefulset.updated" + // Attributes + statefulSetMetricUID = "ddk8s.statefulset.uid" + statefulSetMetricName = "ddk8s.statefulset.name" + statefulSetMetricLabels = "ddk8s.statefulset.labels" + statefulSetMetricAnnotations = "ddk8s.statefulset.annotations" + statefulSetMetricFinalizers = "ddk8s.statefulset.finalizers" + statefulSetMetricCreateTime = "ddk8s.statefulset.create_time" + namespaceMetricName = "ddk8s.namespace.name" + namespaceMetricClusterID = "ddk8s.cluster.id" + namespaceMetricClusterName = "ddk8s.cluster.name" +) + +var statefulSetMetricsToExtract = []string{ + statefulSetMetricAvailable, + statefulSetMetricDesired, + statefulSetMetricReady, + statefulSetMetricUpdated, +} + +// GetOtlpExportReqFromDatadogStatefulSetData converts Datadog statefulset data into OTLP ExportRequest. +func GetOtlpExportReqFromDatadogStatefulSetData(origin, key string, Body interface{}, timestamp int64) (pmetricotlp.ExportRequest, error) { + ddReq, ok := Body.(*processv1.CollectorStatefulSet) + if !ok { + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(statefulSetPayloadErrorMessage) + } + statefulsets := ddReq.GetStatefulSets() + + if len(statefulsets) == 0 { + log.Println("no statefulsets found so skipping") + return pmetricotlp.ExportRequest{}, helpers.NewErrNoMetricsInPayload(statefulSetPayloadErrorMessage) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics() + + clusterName := ddReq.GetClusterName() + clusterID := ddReq.GetClusterId() + + for _, metricName := range statefulSetMetricsToExtract { + for _, statefulset := range statefulsets { + rm := resourceMetrics.AppendEmpty() + resourceAttributes := rm.Resource().Attributes() + metricAttributes := pcommon.NewMap() + commonResourceAttributes := helpers.CommonResourceAttributes{ + Origin: origin, + ApiKey: key, + MwSource: "datadog", + } + helpers.SetMetricResourceAttributes(resourceAttributes, commonResourceAttributes) + + scopeMetrics := helpers.AppendInstrScope(&rm) + setHostK8sAttributes(metricAttributes, clusterName, clusterID) + appendStatefulSetMetrics(&scopeMetrics, resourceAttributes, metricAttributes, statefulset, metricName, timestamp) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(metrics), nil +} + +func appendStatefulSetMetrics(scopeMetrics *pmetric.ScopeMetrics, resourceAttributes pcommon.Map, metricAttributes pcommon.Map, statefulset *processv1.StatefulSet, metricName string, timestamp int64) { + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(metricName) + + var metricVal int64 + + if metadata := statefulset.GetMetadata(); metadata != nil { + resourceAttributes.PutStr(statefulSetMetricUID, metadata.GetUid()) + metricAttributes.PutStr(namespaceMetricName, metadata.GetNamespace()) + metricAttributes.PutStr(statefulSetMetricName, metadata.GetName()) + metricAttributes.PutStr(statefulSetMetricLabels, strings.Join(metadata.GetLabels(), "&")) + metricAttributes.PutStr(statefulSetMetricAnnotations, strings.Join(metadata.GetAnnotations(), "&")) + metricAttributes.PutStr(statefulSetMetricFinalizers, strings.Join(metadata.GetFinalizers(), ",")) + metricAttributes.PutInt(statefulSetMetricCreateTime, helpers.CalculateCreateTime(metadata.GetCreationTimestamp())) + } + + status := statefulset.GetStatus() + spec := statefulset.GetSpec() + + switch metricName { + case statefulSetMetricAvailable: + metricVal = int64(status.GetCurrentReplicas()) + case statefulSetMetricReady: + metricVal = int64(status.GetReadyReplicas()) + case statefulSetMetricUpdated: + metricVal = int64(status.GetUpdatedReplicas()) + case statefulSetMetricDesired: + metricVal = int64(spec.GetDesiredReplicas()) + } + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + dp := dataPoints.AppendEmpty() + + dp.SetTimestamp(pcommon.Timestamp(timestamp)) + dp.SetIntValue(metricVal) + + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) +} + +func setHostK8sAttributes(metricAttributes pcommon.Map, clusterName string, clusterID string) { + metricAttributes.PutStr(namespaceMetricClusterID, clusterID) + metricAttributes.PutStr(namespaceMetricClusterName, clusterName) +}