From 4201164a2cb0c02ced67eef9e49ae56780dc2c91 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Tue, 3 Dec 2024 23:13:41 +0000 Subject: [PATCH] Base migration from kubelet.pod to corev1.pod --- .../collectors/internal/kubelet/kubelet.go | 142 +++++++++++------- pkg/kubestatemetrics/builder/kubelet_pods.go | 8 +- .../builder/kubelet_pods_test.go | 18 +-- pkg/util/kubernetes/kubelet/kubelet.go | 65 ++++++++ .../kubernetes/kubelet/kubelet_common_test.go | 17 ++- .../kubernetes/kubelet/kubelet_interface.go | 2 + .../kubelet/kubelet_orchestrator.go | 32 ---- pkg/util/kubernetes/kubelet/podwatcher.go | 43 +++--- .../kubernetes/kubelet/podwatcher_test.go | 46 +++--- 9 files changed, 220 insertions(+), 153 deletions(-) diff --git a/comp/core/workloadmeta/collectors/internal/kubelet/kubelet.go b/comp/core/workloadmeta/collectors/internal/kubelet/kubelet.go index c9e4960d805729..13c93cf96f838a 100644 --- a/comp/core/workloadmeta/collectors/internal/kubelet/kubelet.go +++ b/comp/core/workloadmeta/collectors/internal/kubelet/kubelet.go @@ -15,6 +15,7 @@ import ( "time" "go.uber.org/fx" + corev1 "k8s.io/api/core/v1" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" "github.com/DataDog/datadog-agent/internal/third_party/golang/expansion" @@ -107,11 +108,11 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType { return c.catalog } -func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent { +func (c *collector) parsePods(pods []*corev1.Pod) []workloadmeta.CollectorEvent { events := []workloadmeta.CollectorEvent{} for _, pod := range pods { - podMeta := pod.Metadata + podMeta := pod.ObjectMeta if podMeta.UID == "" { log.Debugf("pod has no UID. meta: %+v", podMeta) continue @@ -128,37 +129,37 @@ func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent podID := workloadmeta.EntityID{ Kind: workloadmeta.KindKubernetesPod, - ID: podMeta.UID, + ID: string(podMeta.UID), } podInitContainers, initContainerEvents := c.parsePodContainers( pod, pod.Spec.InitContainers, - pod.Status.InitContainers, + pod.Status.InitContainerStatuses, &podID, ) podContainers, containerEvents := c.parsePodContainers( pod, pod.Spec.Containers, - pod.Status.Containers, + pod.Status.ContainerStatuses, &podID, ) GPUVendors := getGPUVendorsFromContainers(initContainerEvents, containerEvents) - podOwners := pod.Owners() - owners := make([]workloadmeta.KubernetesPodOwner, 0, len(podOwners)) - for _, o := range podOwners { + owners := make([]workloadmeta.KubernetesPodOwner, 0, len(pod.OwnerReferences)) + for _, o := range pod.OwnerReferences { owners = append(owners, workloadmeta.KubernetesPodOwner{ Kind: o.Kind, Name: o.Name, - ID: o.ID, + ID: string(o.UID), }) } - PodSecurityContext := extractPodSecurityContext(&pod.Spec) - RuntimeClassName := extractPodRuntimeClassName(&pod.Spec) + PodSecurityContext := extractPodSecurityContext(pod.Spec.SecurityContext) + RuntimeClassName := extractPodRuntimeClassName(pod.Spec.RuntimeClassName) + PersistentVolumeClaimNames := extractPersistentVolumeClaimNames(pod.Spec.Volumes) entity := &workloadmeta.KubernetesPod{ EntityID: podID, @@ -169,14 +170,14 @@ func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent Labels: podMeta.Labels, }, Owners: owners, - PersistentVolumeClaimNames: pod.GetPersistentVolumeClaimNames(), + PersistentVolumeClaimNames: PersistentVolumeClaimNames, InitContainers: podInitContainers, Containers: podContainers, - Ready: kubelet.IsPodReady(pod), - Phase: pod.Status.Phase, + Ready: kubelet.NewIsPodReady(pod), + Phase: string(pod.Status.Phase), IP: pod.Status.PodIP, PriorityClass: pod.Spec.PriorityClassName, - QOSClass: pod.Status.QOSClass, + QOSClass: string(pod.Status.QOSClass), GPUVendorList: GPUVendors, RuntimeClass: RuntimeClassName, SecurityContext: PodSecurityContext, @@ -195,16 +196,16 @@ func (c *collector) parsePods(pods []*kubelet.Pod) []workloadmeta.CollectorEvent } func (c *collector) parsePodContainers( - pod *kubelet.Pod, - containerSpecs []kubelet.ContainerSpec, - containerStatuses []kubelet.ContainerStatus, + pod *corev1.Pod, + containerSpecs []corev1.Container, + containerStatuses []corev1.ContainerStatus, parent *workloadmeta.EntityID, ) ([]workloadmeta.OrchestratorContainer, []workloadmeta.CollectorEvent) { podContainers := make([]workloadmeta.OrchestratorContainer, 0, len(containerStatuses)) events := make([]workloadmeta.CollectorEvent, 0, len(containerStatuses)) for _, container := range containerStatuses { - if container.ID == "" { + if container.ContainerID == "" { // A container without an ID has not been created by // the runtime yet, so we ignore them until it's // detected again. @@ -235,7 +236,7 @@ func (c *collector) parsePodContainers( } } - runtime, containerID := containers.SplitEntityName(container.ID) + runtime, containerID := containers.SplitEntityName(container.ContainerID) podContainer := workloadmeta.OrchestratorContainer{ ID: containerID, Name: container.Name, @@ -252,13 +253,13 @@ func (c *collector) parsePodContainers( } podContainer.Image.ID = imageID - containerSecurityContext = extractContainerSecurityContext(containerSpec) + containerSecurityContext = extractContainerSecurityContext(containerSpec.SecurityContext) ports = make([]workloadmeta.ContainerPort, 0, len(containerSpec.Ports)) for _, port := range containerSpec.Ports { ports = append(ports, workloadmeta.ContainerPort{ Name: port.Name, - Port: port.ContainerPort, - Protocol: port.Protocol, + Port: int(port.ContainerPort), + Protocol: string(port.Protocol), }) } } else { @@ -269,14 +270,14 @@ func (c *collector) parsePodContainers( if st := container.State.Running; st != nil { containerState.Running = true containerState.Status = workloadmeta.ContainerStatusRunning - containerState.StartedAt = st.StartedAt - containerState.CreatedAt = st.StartedAt // CreatedAt not available + containerState.StartedAt = st.StartedAt.Time + containerState.CreatedAt = st.StartedAt.Time // CreatedAt not available } else if st := container.State.Terminated; st != nil { containerState.Running = false containerState.Status = workloadmeta.ContainerStatusStopped - containerState.CreatedAt = st.StartedAt - containerState.StartedAt = st.StartedAt - containerState.FinishedAt = st.FinishedAt + containerState.CreatedAt = st.StartedAt.Time + containerState.StartedAt = st.StartedAt.Time + containerState.FinishedAt = st.FinishedAt.Time } // Kubelet considers containers without probe to be ready @@ -298,7 +299,7 @@ func (c *collector) parsePodContainers( EntityMeta: workloadmeta.EntityMeta{ Name: container.Name, Labels: map[string]string{ - kubernetes.CriContainerNamespaceLabel: pod.Metadata.Namespace, + kubernetes.CriContainerNamespaceLabel: pod.ObjectMeta.Namespace, }, }, Image: image, @@ -333,51 +334,84 @@ func getGPUVendorsFromContainers(initContainerEvents, containerEvents []workload return GPUVendors } -func extractPodRuntimeClassName(spec *kubelet.Spec) string { - if spec.RuntimeClassName == nil { +func extractPodRuntimeClassName(runtimeClass *string) string { + if runtimeClass == nil { return "" } - return *spec.RuntimeClassName + return *runtimeClass } -func extractPodSecurityContext(spec *kubelet.Spec) *workloadmeta.PodSecurityContext { - if spec.SecurityContext == nil { +func extractPodSecurityContext(securityContext *corev1.PodSecurityContext) *workloadmeta.PodSecurityContext { + if securityContext == nil { return nil } + // NOTE: Values are being converted from int64 to int32. Is this safe? + // Q: Should we upgrade the internal security context type to also be int64? + runAsUser := int32(0) + runAsGroup := int32(0) + fsGroup := int32(0) + if securityContext.RunAsUser != nil { + runAsUser = int32(*securityContext.RunAsUser) + } + if securityContext.RunAsGroup != nil { + runAsGroup = int32(*securityContext.RunAsGroup) + } + if securityContext.FSGroup != nil { + fsGroup = int32(*securityContext.FSGroup) + } + return &workloadmeta.PodSecurityContext{ - RunAsUser: spec.SecurityContext.RunAsUser, - RunAsGroup: spec.SecurityContext.RunAsGroup, - FsGroup: spec.SecurityContext.FsGroup, + RunAsUser: runAsUser, + RunAsGroup: runAsGroup, + FsGroup: fsGroup, + } +} + +func extractPersistentVolumeClaimNames(volumes []corev1.Volume) []string { + pvcs := []string{} + for _, volume := range volumes { + if volume.PersistentVolumeClaim != nil { + pvcs = append(pvcs, volume.PersistentVolumeClaim.ClaimName) + } + } + return pvcs +} + +func convertCapabilityList(capList []corev1.Capability) []string { + caps := make([]string, 0, len(capList)) + for _, cap := range capList { + caps = append(caps, string(cap)) } + return caps } -func extractContainerSecurityContext(spec *kubelet.ContainerSpec) *workloadmeta.ContainerSecurityContext { - if spec.SecurityContext == nil { +func extractContainerSecurityContext(securityContext *corev1.SecurityContext) *workloadmeta.ContainerSecurityContext { + if securityContext == nil { return nil } var caps *workloadmeta.Capabilities - if spec.SecurityContext.Capabilities != nil { + if securityContext.Capabilities != nil { caps = &workloadmeta.Capabilities{ - Add: spec.SecurityContext.Capabilities.Add, - Drop: spec.SecurityContext.Capabilities.Drop, + Add: convertCapabilityList(securityContext.Capabilities.Add), + Drop: convertCapabilityList(securityContext.Capabilities.Drop), } } privileged := false - if spec.SecurityContext.Privileged != nil { - privileged = *spec.SecurityContext.Privileged + if securityContext.Privileged != nil { + privileged = *securityContext.Privileged } var seccompProfile *workloadmeta.SeccompProfile - if spec.SecurityContext.SeccompProfile != nil { + if securityContext.SeccompProfile != nil { localhostProfile := "" - if spec.SecurityContext.SeccompProfile.LocalhostProfile != nil { - localhostProfile = *spec.SecurityContext.SeccompProfile.LocalhostProfile + if securityContext.SeccompProfile.LocalhostProfile != nil { + localhostProfile = *securityContext.SeccompProfile.LocalhostProfile } - spType := workloadmeta.SeccompProfileType(spec.SecurityContext.SeccompProfile.Type) + spType := workloadmeta.SeccompProfileType(securityContext.SeccompProfile.Type) seccompProfile = &workloadmeta.SeccompProfile{ Type: spType, @@ -392,7 +426,7 @@ func extractContainerSecurityContext(spec *kubelet.ContainerSpec) *workloadmeta. } } -func extractEnvFromSpec(envSpec []kubelet.EnvVar) map[string]string { +func extractEnvFromSpec(envSpec []corev1.EnvVar) map[string]string { env := make(map[string]string) mappingFunc := expansion.MappingFuncFor(env) @@ -433,20 +467,20 @@ func extractGPUVendor(gpuNamePrefix kubelet.ResourceName) string { return gpuVendor } -func extractResources(spec *kubelet.ContainerSpec) workloadmeta.ContainerResources { +func extractResources(spec *corev1.Container) workloadmeta.ContainerResources { resources := workloadmeta.ContainerResources{} - if cpuReq, found := spec.Resources.Requests[kubelet.ResourceCPU]; found { + if cpuReq, found := spec.Resources.Requests[corev1.ResourceCPU]; found { resources.CPURequest = pointer.Ptr(cpuReq.AsApproximateFloat64() * 100) // For 100Mi, AsApproximate returns 0.1, we return 10% } - if memoryReq, found := spec.Resources.Requests[kubelet.ResourceMemory]; found { + if memoryReq, found := spec.Resources.Requests[corev1.ResourceMemory]; found { resources.MemoryRequest = pointer.Ptr(uint64(memoryReq.Value())) } // extract GPU resource info from the possible GPU sources uniqueGPUVendor := make(map[string]bool) - resourceKeys := make([]kubelet.ResourceName, 0, len(spec.Resources.Requests)) + resourceKeys := make([]corev1.ResourceName, 0, len(spec.Resources.Requests)) for resourceName := range spec.Resources.Requests { resourceKeys = append(resourceKeys, resourceName) } @@ -471,7 +505,7 @@ func extractResources(spec *kubelet.ContainerSpec) workloadmeta.ContainerResourc return resources } -func findContainerSpec(name string, specs []kubelet.ContainerSpec) *kubelet.ContainerSpec { +func findContainerSpec(name string, specs []corev1.Container) *corev1.Container { for _, spec := range specs { if spec.Name == name { return &spec diff --git a/pkg/kubestatemetrics/builder/kubelet_pods.go b/pkg/kubestatemetrics/builder/kubelet_pods.go index c0a50018c110ae..5a21b34e1afa24 100644 --- a/pkg/kubestatemetrics/builder/kubelet_pods.go +++ b/pkg/kubestatemetrics/builder/kubelet_pods.go @@ -30,7 +30,7 @@ const ( // podWatcher is an interface for a component that watches for changes in pods type podWatcher interface { - PullChanges(ctx context.Context) ([]*kubelet.Pod, error) + PullChanges(ctx context.Context) ([]*corev1.Pod, error) Expire() ([]string, error) } @@ -108,14 +108,12 @@ func (kr *kubeletReflector) updateStores(ctx context.Context) error { } for _, pod := range pods { - if !kr.watchAllNamespaces && !slices.Contains(kr.namespaces, pod.Metadata.Namespace) { + if !kr.watchAllNamespaces && !slices.Contains(kr.namespaces, pod.Namespace) { continue } - kubePod := kubelet.ConvertKubeletPodToK8sPod(pod) - for _, store := range kr.stores { - err := store.Add(kubePod) + err := store.Add(pod) if err != nil { // log instead of returning error to continue updating other stores log.Warnf("Failed to add pod to store: %s", err) diff --git a/pkg/kubestatemetrics/builder/kubelet_pods_test.go b/pkg/kubestatemetrics/builder/kubelet_pods_test.go index a9020b21435497..910348e0f8c81f 100644 --- a/pkg/kubestatemetrics/builder/kubelet_pods_test.go +++ b/pkg/kubestatemetrics/builder/kubelet_pods_test.go @@ -17,17 +17,15 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - - "github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet" ) type MockPodWatcher struct { mock.Mock } -func (m *MockPodWatcher) PullChanges(ctx context.Context) ([]*kubelet.Pod, error) { +func (m *MockPodWatcher) PullChanges(ctx context.Context) ([]*corev1.Pod, error) { args := m.Called(ctx) - return args.Get(0).([]*kubelet.Pod), args.Error(1) + return args.Get(0).([]*corev1.Pod), args.Error(1) } func (m *MockPodWatcher) Expire() ([]string, error) { @@ -123,17 +121,15 @@ func TestUpdateStores_AddPods(t *testing.T) { watcher := new(MockPodWatcher) - kubeletPod := &kubelet.Pod{ - Metadata: kubelet.PodMetadata{ + corePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Namespace: test.addedPodNamespace, Name: "test-pod", UID: "12345", }, } - kubernetesPod := kubelet.ConvertKubeletPodToK8sPod(kubeletPod) - - watcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{kubeletPod}, nil) + watcher.On("PullChanges", mock.Anything).Return([]*corev1.Pod{corePod}, nil) watcher.On("Expire").Return([]string{}, nil) reflector := kubeletReflector{ @@ -152,7 +148,7 @@ func TestUpdateStores_AddPods(t *testing.T) { if test.podShouldBeAdded { for _, store := range stores { - store.AssertCalled(t, "Add", kubernetesPod) + store.AssertCalled(t, "Add", corePod) } } else { for _, store := range stores { @@ -196,7 +192,7 @@ func TestUpdateStores_HandleExpired(t *testing.T) { } watcher := new(MockPodWatcher) - watcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{}, nil) + watcher.On("PullChanges", mock.Anything).Return([]*corev1.Pod{}, nil) watcher.On("Expire").Return([]string{test.expiredUID}, nil) reflector := kubeletReflector{ diff --git a/pkg/util/kubernetes/kubelet/kubelet.go b/pkg/util/kubernetes/kubelet/kubelet.go index b1622e27bf2ea9..8cadece52026fd 100644 --- a/pkg/util/kubernetes/kubelet/kubelet.go +++ b/pkg/util/kubernetes/kubelet/kubelet.go @@ -23,6 +23,10 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/util/retry" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" kubeletv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) @@ -395,6 +399,34 @@ func (ku *KubeUtil) GetRawMetrics(ctx context.Context) ([]byte, error) { return data, nil } +// GetRawLocalPodList returns the unfiltered pod list from the kubelet +func (ku *KubeUtil) GetRawLocalPodList(ctx context.Context) ([]*corev1.Pod, error) { + data, code, err := ku.QueryKubelet(ctx, kubeletPodPath) + if err != nil { + return nil, fmt.Errorf("error performing kubelet query %s%s: %s", ku.kubeletClient.kubeletURL, kubeletPodPath, err) + } + if code != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d on %s%s: %s", code, ku.kubeletClient.kubeletURL, kubeletPodPath, string(data)) + } + + podListData, err := runtime.Decode(clientsetscheme.Codecs.UniversalDecoder(corev1.SchemeGroupVersion), data) + if err != nil { + return nil, fmt.Errorf("unable to decode the pod list: %s", err) + } + podList, ok := podListData.(*v1.PodList) + if !ok { + return nil, fmt.Errorf("pod list type assertion failed on %v", podListData) + } + + // transform []v1.Pod in []*v1.Pod + pods := make([]*corev1.Pod, len(podList.Items)) + for i := 0; i < len(pods); i++ { + pods[i] = &podList.Items[i] + } + + return pods, nil +} + // IsPodReady return a bool if the Pod is ready func IsPodReady(pod *Pod) bool { // static pods are always reported as Pending, so we make an exception there @@ -417,6 +449,30 @@ func IsPodReady(pod *Pod) bool { return false } +// NewIsPodReady return a bool if the Pod is ready for corev1.Pods +// TODO CONTP: eventually phase out the use of kubelet.Pod in favor or corev1.Pod +// and remove the old IsPodReady function and replace with the new one +func NewIsPodReady(pod *corev1.Pod) bool { + // static pods are always reported as Pending, so we make an exception there + if pod.Status.Phase == corev1.PodPending && newIsPodStatic(pod) { + return true + } + + if pod.Status.Phase != corev1.PodRunning { + return false + } + + if tolerate, ok := pod.Annotations[unreadyAnnotation]; ok && tolerate == "true" { + return true + } + for _, status := range pod.Status.Conditions { + if status.Type == corev1.PodReady && status.Status == corev1.ConditionTrue { + return true + } + } + return false +} + // isPodStatic identifies whether a pod is static or not based on an annotation // Static pods can be sent to the kubelet from files or an http endpoint. func isPodStatic(pod *Pod) bool { @@ -425,3 +481,12 @@ func isPodStatic(pod *Pod) bool { } return false } + +// newIsPodStatic identifies whether a corev1.Pod is static or not based on an anootation +// Static pods can be sent to the kubelet from files or an http endpoint. +func newIsPodStatic(pod *corev1.Pod) bool { + if source, ok := pod.Annotations[configSourceAnnotation]; ok && (source == "file" || source == "http") { + return len(pod.Status.ContainerStatuses) == 0 + } + return false +} diff --git a/pkg/util/kubernetes/kubelet/kubelet_common_test.go b/pkg/util/kubernetes/kubelet/kubelet_common_test.go index 1d105dd9787033..4a476d893e377d 100644 --- a/pkg/util/kubernetes/kubelet/kubelet_common_test.go +++ b/pkg/util/kubernetes/kubelet/kubelet_common_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" ) func TestGetMeta(t *testing.T) { @@ -39,23 +40,23 @@ process_cpu_seconds_total 127923.04 assert.Equal(t, "process_cpu_seconds_total 127923.04", metric) } -func loadPodsFixture(path string) ([]*Pod, error) { +func loadPodsFixture(path string) ([]*corev1.Pod, error) { raw, err := os.ReadFile(path) if err != nil { return nil, err } - var podList PodList + var podList corev1.PodList err = json.Unmarshal(raw, &podList) if err != nil { return nil, err } - for _, pod := range podList.Items { - allContainers := make([]ContainerStatus, 0, len(pod.Status.InitContainers)+len(pod.Status.Containers)) - allContainers = append(allContainers, pod.Status.InitContainers...) - allContainers = append(allContainers, pod.Status.Containers...) - pod.Status.AllContainers = allContainers + + pods := make([]*corev1.Pod, len(podList.Items)) + for i := range podList.Items { + pods[i] = &podList.Items[i] } - return podList.Items, nil + + return pods, nil } func TestKubeContainerIDToTaggerEntityID(t *testing.T) { diff --git a/pkg/util/kubernetes/kubelet/kubelet_interface.go b/pkg/util/kubernetes/kubelet/kubelet_interface.go index cda27911131788..4f21f7bcb72c42 100644 --- a/pkg/util/kubernetes/kubelet/kubelet_interface.go +++ b/pkg/util/kubernetes/kubelet/kubelet_interface.go @@ -10,6 +10,7 @@ package kubelet import ( "context" + corev1 "k8s.io/api/core/v1" kubeletv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) @@ -25,5 +26,6 @@ type KubeUtilInterface interface { QueryKubelet(ctx context.Context, path string) ([]byte, int, error) GetRawConnectionInfo() map[string]string GetRawMetrics(ctx context.Context) ([]byte, error) + GetRawLocalPodList(ctx context.Context) ([]*corev1.Pod, error) GetLocalStatsSummary(ctx context.Context) (*kubeletv1alpha1.Summary, error) } diff --git a/pkg/util/kubernetes/kubelet/kubelet_orchestrator.go b/pkg/util/kubernetes/kubelet/kubelet_orchestrator.go index 90c61f824daf84..678c3a0e06abce 100644 --- a/pkg/util/kubernetes/kubelet/kubelet_orchestrator.go +++ b/pkg/util/kubernetes/kubelet/kubelet_orchestrator.go @@ -9,12 +9,8 @@ package kubelet import ( "context" - "fmt" - "net/http" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - clientsetscheme "k8s.io/client-go/kubernetes/scheme" kubeletv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) @@ -33,31 +29,3 @@ type KubeUtilInterface interface { GetRawLocalPodList(ctx context.Context) ([]*v1.Pod, error) GetLocalStatsSummary(ctx context.Context) (*kubeletv1alpha1.Summary, error) } - -// GetRawLocalPodList returns the unfiltered pod list from the kubelet -func (ku *KubeUtil) GetRawLocalPodList(ctx context.Context) ([]*v1.Pod, error) { - data, code, err := ku.QueryKubelet(ctx, kubeletPodPath) - if err != nil { - return nil, fmt.Errorf("error performing kubelet query %s%s: %s", ku.kubeletClient.kubeletURL, kubeletPodPath, err) - } - if code != http.StatusOK { - return nil, fmt.Errorf("unexpected status code %d on %s%s: %s", code, ku.kubeletClient.kubeletURL, kubeletPodPath, string(data)) - } - - podListData, err := runtime.Decode(clientsetscheme.Codecs.UniversalDecoder(v1.SchemeGroupVersion), data) - if err != nil { - return nil, fmt.Errorf("unable to decode the pod list: %s", err) - } - podList, ok := podListData.(*v1.PodList) - if !ok { - return nil, fmt.Errorf("pod list type assertion failed on %v", podListData) - } - - // transform []v1.Pod in []*v1.Pod - pods := make([]*v1.Pod, len(podList.Items)) - for i := 0; i < len(pods); i++ { - pods[i] = &podList.Items[i] - } - - return pods, nil -} diff --git a/pkg/util/kubernetes/kubelet/podwatcher.go b/pkg/util/kubernetes/kubelet/podwatcher.go index c3fca9a4cc4d77..e50a0efcc46948 100644 --- a/pkg/util/kubernetes/kubelet/podwatcher.go +++ b/pkg/util/kubernetes/kubelet/podwatcher.go @@ -15,6 +15,9 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -56,9 +59,9 @@ func NewPodWatcher(expiryDuration time.Duration) (*PodWatcher, error) { // PullChanges pulls a new podList from the kubelet and returns Pod objects for // new / updated pods. Updated pods will be sent entirely, user must replace // previous info for these pods. -func (w *PodWatcher) PullChanges(ctx context.Context) ([]*Pod, error) { - var podList []*Pod - podList, err := w.kubeUtil.GetLocalPodList(ctx) +func (w *PodWatcher) PullChanges(ctx context.Context) ([]*corev1.Pod, error) { + var podList []*corev1.Pod + podList, err := w.kubeUtil.GetRawLocalPodList(ctx) if err != nil { return podList, err } @@ -66,20 +69,20 @@ func (w *PodWatcher) PullChanges(ctx context.Context) ([]*Pod, error) { } // computeChanges is used by PullChanges, split for testing -func (w *PodWatcher) computeChanges(podList []*Pod) ([]*Pod, error) { +func (w *PodWatcher) computeChanges(podList []*corev1.Pod) ([]*corev1.Pod, error) { now := time.Now() - var updatedPods []*Pod + var updatedPods []*corev1.Pod w.Lock() defer w.Unlock() for _, pod := range podList { - podEntity := PodUIDToEntityName(pod.Metadata.UID) + podEntity := PodUIDToEntityName(string(pod.ObjectMeta.UID)) newPod := false _, foundPod := w.lastSeen[podEntity] if !foundPod { - w.tagsDigest[podEntity] = digestPodMeta(pod.Metadata) - w.oldPhase[podEntity] = pod.Status.Phase + w.tagsDigest[podEntity] = digestPodMeta(pod.ObjectMeta) + w.oldPhase[podEntity] = string(pod.Status.Phase) newPod = true } @@ -88,10 +91,10 @@ func (w *PodWatcher) computeChanges(podList []*Pod) ([]*Pod, error) { // Detect updated containers updatedContainer := false - isPodReady := IsPodReady(pod) + isPodReady := NewIsPodReady(pod) - for _, container := range pod.Status.GetAllContainers() { - if container.IsPending() { + for _, container := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + if container.ContainerID == "" { // We don't check container readiness as init // containers are never ready. We check if the // container has an ID instead (has run or is @@ -100,29 +103,29 @@ func (w *PodWatcher) computeChanges(podList []*Pod) ([]*Pod, error) { } // new container are always sent ignoring the pod state - if _, found := w.lastSeen[container.ID]; !found { + if _, found := w.lastSeen[container.ContainerID]; !found { updatedContainer = true } - w.lastSeen[container.ID] = now + w.lastSeen[container.ContainerID] = now // for existing containers, check whether the // readiness has changed since last time - if oldReadiness, found := w.oldReadiness[container.ID]; !found || oldReadiness != isPodReady { + if oldReadiness, found := w.oldReadiness[container.ContainerID]; !found || oldReadiness != isPodReady { // the pod has never been seen ready or was removed when // reaching the unreadinessTimeout updatedContainer = true } - w.oldReadiness[container.ID] = isPodReady + w.oldReadiness[container.ContainerID] = isPodReady if isPodReady { - w.lastSeenReady[container.ID] = now + w.lastSeenReady[container.ContainerID] = now } } newLabelsOrAnnotations := false newPhase := false - newTagsDigest := digestPodMeta(pod.Metadata) + newTagsDigest := digestPodMeta(pod.ObjectMeta) // if the pod already existed, check whether tagsDigest or // phase changed @@ -132,8 +135,8 @@ func (w *PodWatcher) computeChanges(podList []*Pod) ([]*Pod, error) { newLabelsOrAnnotations = true } - if pod.Status.Phase != w.oldPhase[podEntity] { - w.oldPhase[podEntity] = pod.Status.Phase + if string(pod.Status.Phase) != w.oldPhase[podEntity] { + w.oldPhase[podEntity] = string(pod.Status.Phase) newPhase = true } } @@ -184,7 +187,7 @@ func (w *PodWatcher) Expire() ([]string, error) { // digestPodMeta returns a unique hash of pod labels // and annotations. // it hashes labels then annotations and makes a single hash of both maps -func digestPodMeta(meta PodMetadata) string { +func digestPodMeta(meta metav1.ObjectMeta) string { h := fnv.New64() h.Write([]byte(digestMapValues(meta.Labels))) //nolint:errcheck h.Write([]byte(digestMapValues(meta.Annotations))) //nolint:errcheck diff --git a/pkg/util/kubernetes/kubelet/podwatcher_test.go b/pkg/util/kubernetes/kubelet/podwatcher_test.go index 43460d34c2aac0..9b06e14149a0ca 100644 --- a/pkg/util/kubernetes/kubelet/podwatcher_test.go +++ b/pkg/util/kubernetes/kubelet/podwatcher_test.go @@ -63,15 +63,15 @@ func (suite *PodwatcherTestSuite) TestPodWatcherComputeChanges() { changes, err = watcher.computeChanges(remainingPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 4) - require.Equal(suite.T(), changes[0].Metadata.UID, remainingPods[0].Metadata.UID) + require.Equal(suite.T(), changes[0].ObjectMeta.UID, remainingPods[0].ObjectMeta.UID) // A new container ID in an existing pod should trigger - remainingPods[0].Status.Containers[0].ID = "testNewID" - remainingPods[0].Status.AllContainers[0].ID = "testNewID" + remainingPods[0].Status.ContainerStatuses[0].ContainerID = "testNewID" + // remainingPods[0].Status.AllContainers[0].ID = "testNewID" changes, err = watcher.computeChanges(remainingPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) - require.Equal(suite.T(), changes[0].Metadata.UID, remainingPods[0].Metadata.UID) + require.Equal(suite.T(), changes[0].ObjectMeta.UID, remainingPods[0].ObjectMeta.UID) // Sending the same pod again with no change changes, err = watcher.computeChanges(remainingPods) @@ -91,10 +91,10 @@ func (suite *PodwatcherTestSuite) TestPodWatcherComputeChangesInConditions() { require.Len(suite.T(), changes, 6, fmt.Sprintf("%d", len(changes))) for _, po := range changes { // nginx pod is not ready but still detected by the podwatcher - if po.Metadata.Name == "nginx-99d8b564-4r4vq" { - require.False(suite.T(), IsPodReady(po)) + if po.ObjectMeta.Name == "nginx-99d8b564-4r4vq" { + require.False(suite.T(), NewIsPodReady(po)) } else { - require.True(suite.T(), IsPodReady(po)) + require.True(suite.T(), NewIsPodReady(po)) } } @@ -113,7 +113,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherComputeChangesInConditions() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 2) assert.Equal(suite.T(), "nginx", changes[0].Spec.Containers[0].Name) - require.True(suite.T(), IsPodReady(changes[0])) + require.True(suite.T(), NewIsPodReady(changes[0])) } func (suite *PodwatcherTestSuite) TestPodWatcherWithInitContainers() { @@ -137,7 +137,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherWithInitContainers() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) assert.Equal(suite.T(), "myapp-container", changes[0].Spec.Containers[0].Name) - require.True(suite.T(), IsPodReady(changes[0])) + require.True(suite.T(), NewIsPodReady(changes[0])) } func (suite *PodwatcherTestSuite) TestPodWatcherWithShortLivedContainers() { @@ -161,7 +161,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherWithShortLivedContainers() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) assert.Equal(suite.T(), "short-lived-container", changes[0].Spec.Containers[0].Name) - require.False(suite.T(), IsPodReady(changes[0])) + require.False(suite.T(), NewIsPodReady(changes[0])) } func (suite *PodwatcherTestSuite) TestPodWatcherReadinessChange() { @@ -188,7 +188,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherReadinessChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) assert.Equal(suite.T(), "redis-unready", changes[0].Spec.Containers[0].Name) - require.True(suite.T(), IsPodReady(changes[0])) + require.True(suite.T(), NewIsPodReady(changes[0])) expire, err = watcher.Expire() require.Nil(suite.T(), err) require.Len(suite.T(), expire, 0) @@ -255,7 +255,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherReadinessChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) assert.Equal(suite.T(), "redis-unready", changes[0].Spec.Containers[0].Name) - require.True(suite.T(), IsPodReady(changes[0])) + require.True(suite.T(), NewIsPodReady(changes[0])) expire, err = watcher.Expire() require.Nil(suite.T(), err) require.Len(suite.T(), expire, 0) @@ -360,7 +360,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherExpireWholePod() { // Remove last pods from the list, make sure we stop at the right one oldPod := sourcePods[5] - require.Contains(suite.T(), oldPod.Metadata.UID, "d91aa43c-0769-11e8-afcc-000c29dea4f6") + require.Contains(suite.T(), oldPod.ObjectMeta.UID, "d91aa43c-0769-11e8-afcc-000c29dea4f6") _, err = watcher.computeChanges(sourcePods[0:5]) require.Nil(suite.T(), err) @@ -425,7 +425,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherLabelsValueChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 2) - twoPods[0].Metadata.Labels["label1"] = "value1" + twoPods[0].ObjectMeta.Labels["label1"] = "value1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) @@ -434,18 +434,18 @@ func (suite *PodwatcherTestSuite) TestPodWatcherLabelsValueChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 0) - twoPods[0].Metadata.Labels["label1"] = "newvalue1" + twoPods[0].ObjectMeta.Labels["label1"] = "newvalue1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) - delete(twoPods[0].Metadata.Labels, "label1") + delete(twoPods[0].ObjectMeta.Labels, "label1") changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) - twoPods[0].Metadata.Labels["newlabel1"] = "newvalue1" - twoPods[1].Metadata.Labels["label1"] = "value1" + twoPods[0].ObjectMeta.Labels["newlabel1"] = "newvalue1" + twoPods[1].ObjectMeta.Labels["label1"] = "value1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 2) @@ -481,7 +481,7 @@ func (suite *PodwatcherTestSuite) TestPodWatcherAnnotationsValueChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 2) - twoPods[0].Metadata.Annotations["annotation1"] = "value1" + twoPods[0].ObjectMeta.Annotations["annotation1"] = "value1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) @@ -490,18 +490,18 @@ func (suite *PodwatcherTestSuite) TestPodWatcherAnnotationsValueChange() { require.Nil(suite.T(), err) require.Len(suite.T(), changes, 0) - twoPods[0].Metadata.Annotations["annotation1"] = "newvalue1" + twoPods[0].ObjectMeta.Annotations["annotation1"] = "newvalue1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) - delete(twoPods[0].Metadata.Annotations, "annotation1") + delete(twoPods[0].ObjectMeta.Annotations, "annotation1") changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 1) - twoPods[0].Metadata.Annotations["newannotation1"] = "newvalue1" - twoPods[1].Metadata.Annotations["annotation1"] = "value1" + twoPods[0].ObjectMeta.Annotations["newannotation1"] = "newvalue1" + twoPods[1].ObjectMeta.Annotations["annotation1"] = "value1" changes, err = watcher.computeChanges(twoPods) require.Nil(suite.T(), err) require.Len(suite.T(), changes, 2)