Skip to content

Commit

Permalink
[CONTP-499] Parsing GPU tags on kubeapiserver collector (DataDog#31465)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabedos authored Dec 5, 2024
1 parent 4fd23e4 commit 945f15c
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 37 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@
/pkg/util/docker/ @DataDog/container-integrations
/pkg/util/ecs/ @DataDog/container-integrations
/pkg/util/funcs/ @DataDog/ebpf-platform
/pkg/util/gpu/ @DataDog/container-platform
/pkg/util/kernel/ @DataDog/ebpf-platform
/pkg/util/safeelf/ @DataDog/ebpf-platform
/pkg/util/ktime @DataDog/agent-security
Expand Down
50 changes: 13 additions & 37 deletions comp/core/workloadmeta/collectors/internal/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/errors"
"github.com/DataDog/datadog-agent/pkg/util/containers"
pkgcontainersimage "github.com/DataDog/datadog-agent/pkg/util/containers/image"
"github.com/DataDog/datadog-agent/pkg/util/gpu"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -83,13 +84,13 @@ func (c *collector) Pull(ctx context.Context) error {
return err
}

events := c.parsePods(updatedPods)
events := parsePods(updatedPods)

if time.Since(c.lastExpire) >= c.expireFreq {
var expiredIDs []string
expiredIDs, err = c.watcher.Expire()
if err == nil {
events = append(events, c.parseExpires(expiredIDs)...)
events = append(events, parseExpires(expiredIDs)...)
c.lastExpire = time.Now()
}
}
Expand All @@ -107,7 +108,7 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
return c.catalog
}

func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent {
func parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent {
events := []workloadmeta.CollectorEvent{}

for _, pod := range pods {
Expand All @@ -131,14 +132,14 @@ func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent
ID: podMeta.UID,
}

podInitContainers, initContainerEvents := c.parsePodContainers(
podInitContainers, initContainerEvents := parsePodContainers(
pod,
pod.Spec.InitContainers,
pod.Status.InitContainers,
&podID,
)

podContainers, containerEvents := c.parsePodContainers(
podContainers, containerEvents := parsePodContainers(
pod,
pod.Spec.Containers,
pod.Status.Containers,
Expand Down Expand Up @@ -194,7 +195,7 @@ func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent
return events
}

func (c *collector) parsePodContainers(
func parsePodContainers(
pod *kubelet.Pod,
containerSpecs []kubelet.ContainerSpec,
containerStatuses []kubelet.ContainerStatus,
Expand Down Expand Up @@ -427,21 +428,6 @@ func extractEnvFromSpec(envSpec []kubelet.EnvVar) map[string]string {
return env
}

func extractGPUVendor(gpuNamePrefix kubelet.ResourceName) string {
gpuVendor := ""
switch gpuNamePrefix {
case kubelet.ResourcePrefixNvidiaMIG, kubelet.ResourceGenericNvidiaGPU:
gpuVendor = "nvidia"
case kubelet.ResourcePrefixAMDGPU:
gpuVendor = "amd"
case kubelet.ResourcePrefixIntelGPU:
gpuVendor = "intel"
default:
gpuVendor = string(gpuNamePrefix)
}
return gpuVendor
}

func extractResources(spec *kubelet.ContainerSpec) workloadmeta.ContainerResources {
resources := workloadmeta.ContainerResources{}
if cpuReq, found := spec.Resources.Requests[kubelet.ResourceCPU]; found {
Expand All @@ -453,24 +439,14 @@ func extractResources(spec *kubelet.ContainerSpec) workloadmeta.ContainerResourc
}

// extract GPU resource info from the possible GPU sources
uniqueGPUVendor := make(map[string]bool)

resourceKeys := make([]kubelet.ResourceName, 0, len(spec.Resources.Requests))
uniqueGPUVendor := make(map[string]struct{})
for resourceName := range spec.Resources.Requests {
resourceKeys = append(resourceKeys, resourceName)
}

for _, gpuResourceName := range kubelet.GetGPUResourceNames() {
for _, resourceKey := range resourceKeys {
if strings.HasPrefix(string(resourceKey), string(gpuResourceName)) {
if gpuReq, found := spec.Resources.Requests[resourceKey]; found {
resources.GPURequest = pointer.Ptr(uint64(gpuReq.Value()))
uniqueGPUVendor[extractGPUVendor(gpuResourceName)] = true
break
}
}
gpuName, found := gpu.ExtractSimpleGPUName(gpu.ResourceGPU(resourceName))
if found {
uniqueGPUVendor[gpuName] = struct{}{}
}
}

gpuVendorList := make([]string, 0, len(uniqueGPUVendor))
for GPUVendor := range uniqueGPUVendor {
gpuVendorList = append(gpuVendorList, GPUVendor)
Expand All @@ -490,7 +466,7 @@ func findContainerSpec(name string, specs []kubelet.ContainerSpec) *kubelet.Cont
return nil
}

func (c *collector) parseExpires(expiredIDs []string) []workloadmeta.CollectorEvent {
func parseExpires(expiredIDs []string) []workloadmeta.CollectorEvent {
events := make([]workloadmeta.CollectorEvent, 0, len(expiredIDs))
podTerminatedTime := time.Now()

Expand Down
175 changes: 175 additions & 0 deletions comp/core/workloadmeta/collectors/internal/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubelet && test

package kubelet

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
)

func TestPodParser(t *testing.T) {

referencePod := []*kubelet.Pod{
{
Metadata: kubelet.PodMetadata{
Name: "TestPod",
UID: "uniqueIdentifier",
Namespace: "namespace",
Owners: []kubelet.PodOwner{
{
Kind: "ReplicaSet",
Name: "deployment-hashrs",
ID: "ownerUID",
},
},
Annotations: map[string]string{
"annotationKey": "annotationValue",
},
Labels: map[string]string{
"labelKey": "labelValue",
},
},
Spec: kubelet.Spec{
PriorityClassName: "priorityClass",
Volumes: []kubelet.VolumeSpec{
{
Name: "pvcVol",
PersistentVolumeClaim: &kubelet.PersistentVolumeClaimSpec{
ClaimName: "pvcName",
},
},
},
Containers: []kubelet.ContainerSpec{
{
Name: "nginx-container",
Image: "nginx:1.25.2",
Resources: &kubelet.ContainerResourcesSpec{
Requests: kubelet.ResourceList{
"nvidia.com/gpu": resource.Quantity{
Format: "1",
},
},
},
},
},
},
Status: kubelet.Status{
Phase: string(corev1.PodRunning),
Conditions: []kubelet.Conditions{
{
Type: string(corev1.PodReady),
Status: string(corev1.ConditionTrue),
},
},
PodIP: "127.0.0.1",
QOSClass: string(corev1.PodQOSGuaranteed),
Containers: []kubelet.ContainerStatus{
{
Name: "nginx-container",
ImageID: "5dbe7e1b6b9c",
Image: "nginx:1.25.2",
ID: "docker://containerID",
Ready: true,
},
},
},
},
}

events := parsePods(referencePod)
containerEvent, podEvent := events[0], events[1]

expectedContainer := &workloadmeta.Container{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindContainer,
ID: "containerID",
},
EntityMeta: workloadmeta.EntityMeta{
Name: "nginx-container",
Labels: map[string]string{
kubernetes.CriContainerNamespaceLabel: "namespace",
},
},
Image: workloadmeta.ContainerImage{
ID: "5dbe7e1b6b9c",
Name: "nginx",
ShortName: "nginx",
Tag: "1.25.2",
RawName: "nginx:1.25.2",
},
Runtime: "docker",
Resources: workloadmeta.ContainerResources{
GPUVendorList: []string{"nvidia"},
},
Owner: &workloadmeta.EntityID{
Kind: "kubernetes_pod",
ID: "uniqueIdentifier",
},
Ports: []workloadmeta.ContainerPort{},
EnvVars: map[string]string{},
State: workloadmeta.ContainerState{
Health: "healthy",
},
}
expectedPod := &workloadmeta.KubernetesPod{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesPod,
ID: "uniqueIdentifier",
},
EntityMeta: workloadmeta.EntityMeta{
Name: "TestPod",
Namespace: "namespace",
Annotations: map[string]string{
"annotationKey": "annotationValue",
},
Labels: map[string]string{
"labelKey": "labelValue",
},
},
Phase: "Running",
Owners: []workloadmeta.KubernetesPodOwner{
{
Kind: "ReplicaSet",
Name: "deployment-hashrs",
ID: "ownerUID",
},
},
Containers: []workloadmeta.OrchestratorContainer{
{
Name: "nginx-container",
ID: "containerID",
Image: workloadmeta.ContainerImage{
ID: "5dbe7e1b6b9c",
Name: "nginx",
ShortName: "nginx",
Tag: "1.25.2",
RawName: "nginx:1.25.2",
},
},
},
InitContainers: []workloadmeta.OrchestratorContainer{},
PersistentVolumeClaimNames: []string{"pvcName"},
Ready: true,
IP: "127.0.0.1",
PriorityClass: "priorityClass",
GPUVendorList: []string{"nvidia"},
QOSClass: "Guaranteed",
}

assert.Equal(t, expectedPod, podEvent.Entity)

assert.Equal(t, expectedContainer, containerEvent.Entity)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/util/gpu"
)

type podParser struct {
Expand Down Expand Up @@ -62,6 +63,20 @@ func (p podParser) Parse(obj interface{}) workloadmeta.Entity {
rtcName = *pod.Spec.RuntimeClassName
}

var gpuVendorList []string
uniqueGPUVendor := make(map[string]struct{})
for _, container := range pod.Spec.Containers {
for resourceName := range container.Resources.Limits {
gpuName, found := gpu.ExtractSimpleGPUName(gpu.ResourceGPU(resourceName))
if found {
uniqueGPUVendor[gpuName] = struct{}{}
}
}
}
for gpuVendor := range uniqueGPUVendor {
gpuVendorList = append(gpuVendorList, gpuVendor)
}

return &workloadmeta.KubernetesPod{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesPod,
Expand All @@ -81,6 +96,7 @@ func (p podParser) Parse(obj interface{}) workloadmeta.Entity {
PriorityClass: pod.Spec.PriorityClassName,
QOSClass: string(pod.Status.QOSClass),
RuntimeClass: rtcName,
GPUVendorList: gpuVendorList,

// Containers could be generated by this collector, but
// currently it's not to save on memory, since this is supposed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
Expand Down Expand Up @@ -54,6 +55,28 @@ func TestPodParser_Parse(t *testing.T) {
},
},
},
Containers: []corev1.Container{
{
Name: "gpuContainer1",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"nvidia.com/gpu": resource.Quantity{
Format: "1",
},
},
},
},
{
Name: "gpuContainer2",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"gpu.intel.com/xe": resource.Quantity{
Format: "2",
},
},
},
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand Down Expand Up @@ -97,6 +120,7 @@ func TestPodParser_Parse(t *testing.T) {
Ready: true,
IP: "127.0.0.1",
PriorityClass: "priorityClass",
GPUVendorList: []string{"nvidia", "intel"},
QOSClass: "Guaranteed",
}

Expand Down
Loading

0 comments on commit 945f15c

Please sign in to comment.