diff --git a/go.mod b/go.mod index 3e22819bb5..6eb3147b8f 100644 --- a/go.mod +++ b/go.mod @@ -23,11 +23,13 @@ require ( google.golang.org/protobuf v1.26.0-rc.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.21.2 + k8s.io/apiextensions-apiserver v0.21.2 k8s.io/apimachinery v0.21.2 k8s.io/client-go v0.21.2 k8s.io/klog/v2 v2.8.0 k8s.io/kubelet v0.0.0 k8s.io/kubernetes v1.21.2 + k8s.io/utils v0.0.0-20201110183641-67b214c5f920 sigs.k8s.io/yaml v1.2.0 ) diff --git a/test/e2e/topology_updater.go b/test/e2e/topology_updater.go new file mode 100644 index 0000000000..24a15d883e --- /dev/null +++ b/test/e2e/topology_updater.go @@ -0,0 +1,481 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" + topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/test/e2e/framework" + e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" + e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + + "sigs.k8s.io/node-feature-discovery/test/e2e/utils" + testutils "sigs.k8s.io/node-feature-discovery/test/e2e/utils" +) + +const ( + defaultNamespace = "default" +) + +var _ = ginkgo.Describe("[kubernetes-sigs] Node topology updater", func() { + var ( + extClient *extclient.Clientset + topologyClient *topologyclientset.Clientset + crd *apiextensionsv1.CustomResourceDefinition + topologyUpdaterNode *v1.Node + workerNodes []v1.Node + kubeletConfig *kubeletconfig.KubeletConfiguration + namespace string + ) + + f := framework.NewDefaultFramework("node-topology-updater") + + ginkgo.BeforeEach(func() { + var err error + + if extClient == nil { + extClient, err = extclient.NewForConfig(f.ClientConfig()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + if topologyClient == nil { + topologyClient, err = topologyclientset.NewForConfig(f.ClientConfig()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + ginkgo.By("Creating the node resource topologies CRD") + crd, err = CreateNodeResourceTopologies(extClient) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = testutils.ConfigureRBAC(f.ClientSet, f.Namespace.Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + image := fmt.Sprintf("%s:%s", *dockerRepo, *dockerTag) + f.PodClient().CreateSync(testutils.NFDMasterPod(image, false)) + + // Create nfd-master service + masterService, err := testutils.CreateService(f.ClientSet, f.Namespace.Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Waiting for the nfd-master service to be up") + gomega.Expect(e2enetwork.WaitForService(f.ClientSet, f.Namespace.Name, masterService.Name, true, time.Second, 10*time.Second)).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Creating nfd-topology-updater daemonset") + topologyUpdaterDaemonSet := testutils.NFDTopologyUpdaterDaemonSet(fmt.Sprintf("%s:%s", *dockerRepo, *dockerTag), []string{}) + topologyUpdaterDaemonSet, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(context.TODO(), topologyUpdaterDaemonSet, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Waiting for daemonset pods to be ready") + gomega.Expect(e2epod.WaitForPodsReady(f.ClientSet, f.Namespace.Name, topologyUpdaterDaemonSet.Spec.Template.Labels["name"], 5)).NotTo(gomega.HaveOccurred()) + + label := labels.SelectorFromSet(map[string]string{"name": topologyUpdaterDaemonSet.Spec.Template.Labels["name"]}) + pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: label.String()}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods.Items).ToNot(gomega.BeEmpty()) + + topologyUpdaterNode, err = f.ClientSet.CoreV1().Nodes().Get(context.TODO(), pods.Items[0].Spec.NodeName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + kubeletConfig, err = e2ekubelet.GetCurrentKubeletConfig(topologyUpdaterNode.Name, "", true) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + workerNodes, err = utils.GetWorkerNodes(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + namespace = f.Namespace.Name + + }) + + ginkgo.Context("with single nfd-master pod", func() { + ginkgo.It("should fill the node resource topologies CR with the data", func() { + nodeTopology := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + isValid := isValidNodeTopology(nodeTopology, kubeletConfig) + gomega.Expect(isValid).To(gomega.BeTrue(), "received invalid topology: %v", nodeTopology) + }) + + ginkgo.It("it should not account for any cpus if a container doesn't request exclusive cpus (best effort QOS)", func() { + ginkgo.By("getting the initial topology information") + initialNodeTopo := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + ginkgo.By("creating a pod consuming resources from the shared, non-exclusive CPU pool (best-effort QoS)") + sleeperPod := testutils.BestEffortSleeperPod() + + podMap := make(map[string]*v1.Pod) + pod := f.PodClient().CreateSync(sleeperPod) + podMap[pod.Name] = pod + defer testutils.DeletePodsAsync(f, podMap) + + cooldown := 30 * time.Second + ginkgo.By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown)) + // the object, hance the resource version must NOT change, so we can only sleep + time.Sleep(cooldown) + ginkgo.By("checking the changes in the updated topology - expecting none") + finalNodeTopo := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + + initialAllocRes := allocatableResourceListFromNodeResourceTopology(initialNodeTopo) + finalAllocRes := allocatableResourceListFromNodeResourceTopology(finalNodeTopo) + if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 { + ginkgo.Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + zoneName, resName, cmp, ok := cmpAllocatableResources(initialAllocRes, finalAllocRes) + framework.Logf("zone=%q resource=%q cmp=%v ok=%v", zoneName, resName, cmp, ok) + if !ok { + ginkgo.Fail(fmt.Sprintf("failed to compare allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + + // This is actually a workaround. + // Depending on the (random, by design) order on which ginkgo runs the tests, a test which exclusively allocates CPUs may run before. + // We cannot (nor should) care about what runs before this test, but we know that this may happen. + // The proper solution is to wait for ALL the container requesting exclusive resources to be gone before to end the related test. + // To date, we don't yet have a clean way to wait for these pod (actually containers) to be completely gone + // (hence, releasing the exclusively allocated CPUs) before to end the test, so this test can run with some leftovers hanging around, + // which makes the accounting harder. And this is what we handle here. + isGreaterEqual := (cmp >= 0) + gomega.Expect(isGreaterEqual).To(gomega.BeTrue(), fmt.Sprintf("final allocatable resources not restored - cmp=%d initial=%v final=%v", cmp, initialAllocRes, finalAllocRes)) + }) + + ginkgo.It("it should not account for any cpus if a container doesn't request exclusive cpus (guaranteed QOS, nonintegral cpu request)", func() { + ginkgo.By("getting the initial topology information") + initialNodeTopo := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + ginkgo.By("creating a pod consuming resources from the shared, non-exclusive CPU pool (guaranteed QoS, nonintegral request)") + sleeperPod := testutils.GuaranteedSleeperPod("500m") + + podMap := make(map[string]*v1.Pod) + pod := f.PodClient().CreateSync(sleeperPod) + podMap[pod.Name] = pod + defer testutils.DeletePodsAsync(f, podMap) + + cooldown := 30 * time.Second + ginkgo.By(fmt.Sprintf("getting the updated topology - sleeping for %v", cooldown)) + // the object, hance the resource version must NOT change, so we can only sleep + time.Sleep(cooldown) + ginkgo.By("checking the changes in the updated topology - expecting none") + finalNodeTopo := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + + initialAllocRes := allocatableResourceListFromNodeResourceTopology(initialNodeTopo) + finalAllocRes := allocatableResourceListFromNodeResourceTopology(finalNodeTopo) + if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 { + ginkgo.Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + zoneName, resName, cmp, ok := cmpAllocatableResources(initialAllocRes, finalAllocRes) + framework.Logf("zone=%q resource=%q cmp=%v ok=%v", zoneName, resName, cmp, ok) + if !ok { + ginkgo.Fail(fmt.Sprintf("failed to compare allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + + // This is actually a workaround. + // Depending on the (random, by design) order on which ginkgo runs the tests, a test which exclusively allocates CPUs may run before. + // We cannot (nor should) care about what runs before this test, but we know that this may happen. + // The proper solution is to wait for ALL the container requesting exclusive resources to be gone before to end the related test. + // To date, we don't yet have a clean way to wait for these pod (actually containers) to be completely gone + // (hence, releasing the exclusively allocated CPUs) before to end the test, so this test can run with some leftovers hanging around, + // which makes the accounting harder. And this is what we handle here. + isGreaterEqual := (cmp >= 0) + gomega.Expect(isGreaterEqual).To(gomega.BeTrue(), fmt.Sprintf("final allocatable resources not restored - cmp=%d initial=%v final=%v", cmp, initialAllocRes, finalAllocRes)) + }) + + ginkgo.It("it should account for containers requesting exclusive cpus", func() { + nodes, err := testutils.FilterNodesWithEnoughCores(workerNodes, "1000m") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if len(nodes) < 1 { + ginkgo.Skip("not enough allocatable cores for this test") + } + + ginkgo.By("getting the initial topology information") + initialNodeTopo := getNodeTopology(topologyClient, topologyUpdaterNode.Name, namespace) + ginkgo.By("creating a pod consuming exclusive CPUs") + sleeperPod := testutils.GuaranteedSleeperPod("1000m") + + podMap := make(map[string]*v1.Pod) + pod := f.PodClient().CreateSync(sleeperPod) + podMap[pod.Name] = pod + defer testutils.DeletePodsAsync(f, podMap) + + ginkgo.By("getting the updated topology") + var finalNodeTopo *v1alpha1.NodeResourceTopology + gomega.Eventually(func() bool { + finalNodeTopo, err = topologyClient.TopologyV1alpha1().NodeResourceTopologies(namespace).Get(context.TODO(), topologyUpdaterNode.Name, metav1.GetOptions{}) + if err != nil { + framework.Logf("failed to get the node topology resource: %v", err) + return false + } + return finalNodeTopo.ObjectMeta.ResourceVersion != initialNodeTopo.ObjectMeta.ResourceVersion + }, time.Minute, 5*time.Second).Should(gomega.BeTrue(), "didn't get updated node topology info") + ginkgo.By("checking the changes in the updated topology") + + initialAllocRes := allocatableResourceListFromNodeResourceTopology(initialNodeTopo) + finalAllocRes := allocatableResourceListFromNodeResourceTopology(finalNodeTopo) + if len(initialAllocRes) == 0 || len(finalAllocRes) == 0 { + ginkgo.Fail(fmt.Sprintf("failed to find allocatable resources from node topology initial=%v final=%v", initialAllocRes, finalAllocRes)) + } + zoneName, resName, isLess := lessAllocatableResources(initialAllocRes, finalAllocRes) + framework.Logf("zone=%q resource=%q isLess=%v", zoneName, resName, isLess) + gomega.Expect(isLess).To(gomega.BeTrue(), fmt.Sprintf("final allocatable resources not decreased - initial=%v final=%v", initialAllocRes, finalAllocRes)) + }) + + }) + + ginkgo.JustAfterEach(func() { + err := testutils.DeconfigureRBAC(f.ClientSet, f.Namespace.Name) + if err != nil { + framework.Logf("failed to delete RBAC resources: %v", err) + } + + err = extClient.ApiextensionsV1().CustomResourceDefinitions().Delete(context.TODO(), crd.Name, metav1.DeleteOptions{}) + if err != nil { + framework.Logf("failed to delete node resources topologies CRD: %v", err) + } + }) +}) + +const nodeResourceTopologiesName = "noderesourcetopologies.topology.node.k8s.io" + +func newNodeResourceTopologies() *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeResourceTopologiesName, + Annotations: map[string]string{ + "api-approved.kubernetes.io": "https://github.com/kubernetes/enhancements/pull/1870", + }, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "topology.node.k8s.io", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "noderesourcetopologies", + Singular: "noderesourcetopology", + ShortNames: []string{ + "node-res-topo", + }, + Kind: "NodeResourceTopology", + }, + Scope: "Namespaced", + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1alpha1", + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "topologyPolicies": { + Type: "array", + Items: &apiextensionsv1.JSONSchemaPropsOrArray{ + Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + }, + }, + }, + Served: true, + Storage: true, + }, + }, + }, + } +} + +func CreateNodeResourceTopologies(extClient extclient.Interface) (*apiextensionsv1.CustomResourceDefinition, error) { + crd, err := extClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), nodeResourceTopologiesName, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + + if err == nil { + return crd, nil + } + + crd, err = extClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), newNodeResourceTopologies(), metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + return crd, nil +} + +func getNodeTopology(topologyClient *topologyclientset.Clientset, nodeName, namespace string) *v1alpha1.NodeResourceTopology { + var nodeTopology *v1alpha1.NodeResourceTopology + var err error + gomega.EventuallyWithOffset(1, func() bool { + nodeTopology, err = topologyClient.TopologyV1alpha1().NodeResourceTopologies(namespace).Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + framework.Logf("failed to get the node topology resource: %v", err) + return false + } + return true + }, time.Minute, 5*time.Second).Should(gomega.BeTrue()) + return nodeTopology +} + +func isValidNodeTopology(nodeTopology *v1alpha1.NodeResourceTopology, kubeletConfig *kubeletconfig.KubeletConfiguration) bool { + if nodeTopology == nil || len(nodeTopology.TopologyPolicies) == 0 { + framework.Logf("failed to get topology policy from the node topology resource") + return false + } + + if nodeTopology.TopologyPolicies[0] != (*kubeletConfig).TopologyManagerPolicy { + return false + } + + if nodeTopology.Zones == nil || len(nodeTopology.Zones) == 0 { + framework.Logf("failed to get topology zones from the node topology resource") + return false + } + + foundNodes := 0 + for _, zone := range nodeTopology.Zones { + // TODO constant not in the APIs + if !strings.HasPrefix(strings.ToUpper(zone.Type), "NODE") { + continue + } + foundNodes++ + + if !isValidCostList(zone.Name, zone.Costs) { + return false + } + + if !isValidResourceList(zone.Name, zone.Resources) { + return false + } + } + return foundNodes > 0 +} + +func isValidCostList(zoneName string, costs v1alpha1.CostList) bool { + if len(costs) == 0 { + framework.Logf("failed to get topology costs for zone %q from the node topology resource", zoneName) + return false + } + + // TODO cross-validate zone names + for _, cost := range costs { + if cost.Name == "" || cost.Value < 0 { + framework.Logf("malformed cost %v for zone %q", cost, zoneName) + } + } + return true +} + +func isValidResourceList(zoneName string, resources v1alpha1.ResourceInfoList) bool { + if len(resources) == 0 { + framework.Logf("failed to get topology resources for zone %q from the node topology resource", zoneName) + return false + } + foundCpu := false + for _, resource := range resources { + // TODO constant not in the APIs + if strings.ToUpper(resource.Name) == "CPU" { + foundCpu = true + } + allocatable, ok1 := resource.Allocatable.AsInt64() + capacity, ok2 := resource.Capacity.AsInt64() + if (!ok1 || !ok2) || ((allocatable < 0 || capacity < 0) || (capacity < allocatable)) { + framework.Logf("malformed resource %v for zone %q", resource, zoneName) + return false + } + } + return foundCpu +} + +func getNamespaceName() string { + if nsName, ok := os.LookupEnv("E2E_NAMESPACE_NAME"); ok { + return nsName + } + return defaultNamespace +} + +func allocatableResourceListFromNodeResourceTopology(nodeTopo *v1alpha1.NodeResourceTopology) map[string]v1.ResourceList { + allocRes := make(map[string]v1.ResourceList) + for _, zone := range nodeTopo.Zones { + if zone.Type != "Node" { + continue + } + resList := make(v1.ResourceList) + for _, res := range zone.Resources { + resList[v1.ResourceName(res.Name)] = res.Allocatable.DeepCopy() + } + if len(resList) == 0 { + continue + } + allocRes[zone.Name] = resList + } + return allocRes +} + +func lessAllocatableResources(expected, got map[string]v1.ResourceList) (string, string, bool) { + zoneName, resName, cmp, ok := cmpAllocatableResources(expected, got) + if !ok { + framework.Logf("-> cmp failed (not ok)") + return "", "", false + } + if cmp < 0 { + return zoneName, resName, true + } + framework.Logf("-> cmp failed (value=%d)", cmp) + return "", "", false +} + +func cmpAllocatableResources(expected, got map[string]v1.ResourceList) (string, string, int, bool) { + if len(got) != len(expected) { + framework.Logf("-> expected=%v (len=%d) got=%v (len=%d)", expected, len(expected), got, len(got)) + return "", "", 0, false + } + for expZoneName, expResList := range expected { + gotResList, ok := got[expZoneName] + if !ok { + return expZoneName, "", 0, false + } + if resName, cmp, ok := cmpResourceList(expResList, gotResList); !ok || cmp != 0 { + return expZoneName, resName, cmp, ok + } + } + return "", "", 0, true +} + +func cmpResourceList(expected, got v1.ResourceList) (string, int, bool) { + if len(got) != len(expected) { + framework.Logf("-> expected=%v (len=%d) got=%v (len=%d)", expected, len(expected), got, len(got)) + return "", 0, false + } + for expResName, expResQty := range expected { + gotResQty, ok := got[expResName] + if !ok { + return string(expResName), 0, false + } + if cmp := gotResQty.Cmp(expResQty); cmp != 0 { + framework.Logf("-> resource=%q cmp=%d expected=%v got=%v", expResName, cmp, expResQty, gotResQty) + return string(expResName), cmp, true + } + } + return "", 0, true +} diff --git a/test/e2e/utils/node.go b/test/e2e/utils/node.go new file mode 100644 index 0000000000..a313b43bd0 --- /dev/null +++ b/test/e2e/utils/node.go @@ -0,0 +1,87 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + // RoleWorker contains the worker role + RoleWorker = "worker" +) + +const ( + // LabelRole contains the key for the role label + LabelRole = "node-role.kubernetes.io" + // LabelHostname contains the key for the hostname label + LabelHostname = "kubernetes.io/hostname" +) + +// GetWorkerNodes returns all nodes labeled as worker +func GetWorkerNodes(f *framework.Framework) ([]v1.Node, error) { + return GetNodesByRole(f, RoleWorker) +} + +// GetByRole returns all nodes with the specified role +func GetNodesByRole(f *framework.Framework, role string) ([]v1.Node, error) { + selector, err := labels.Parse(fmt.Sprintf("%s/%s=", LabelRole, role)) + if err != nil { + return nil, err + } + return GetNodesBySelector(f, selector) +} + +// GetBySelector returns all nodes with the specified selector +func GetNodesBySelector(f *framework.Framework, selector labels.Selector) ([]v1.Node, error) { + nodes, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, err + } + return nodes.Items, nil +} + +// FilterNodesWithEnoughCores returns all nodes with at least the amount of given CPU allocatable +func FilterNodesWithEnoughCores(nodes []v1.Node, cpuAmount string) ([]v1.Node, error) { + requestCpu := resource.MustParse(cpuAmount) + framework.Logf("checking request %v on %d nodes", requestCpu, len(nodes)) + + resNodes := []v1.Node{} + for _, node := range nodes { + availCpu, ok := node.Status.Allocatable[v1.ResourceCPU] + if !ok || availCpu.IsZero() { + return nil, fmt.Errorf("node %q has no allocatable CPU", node.Name) + } + + if availCpu.Cmp(requestCpu) < 1 { + framework.Logf("node %q available cpu %v requested cpu %v", node.Name, availCpu, requestCpu) + continue + } + + framework.Logf("node %q has enough resources, cluster OK", node.Name) + resNodes = append(resNodes, node) + } + + return resNodes, nil +} diff --git a/test/e2e/utils/pod.go b/test/e2e/utils/pod.go index cd17fba0d6..aa64877317 100644 --- a/test/e2e/utils/pod.go +++ b/test/e2e/utils/pod.go @@ -18,15 +18,93 @@ package utils import ( "flag" + "sync" + + "github.com/onsi/ginkgo" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/utils/pointer" ) var pullIfNotPresent = flag.Bool("nfd.pull-if-not-present", false, "Pull Images if not present - not always") +const ( + CentosImage = "quay.io/centos/centos:8" +) + +func GuaranteedSleeperPod(cpuLimit string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sleeper-gu-pod", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + v1.Container{ + Name: "sleeper-gu-cnt", + Image: CentosImage, + // 1 hour (or >= 1h in general) is "forever" for our purposes + Command: []string{"/bin/sleep", "1h"}, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + // we use 1 core because that's the minimal meaningful quantity + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuLimit), + // any random reasonable amount is fine + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100Mi"), + }, + }, + }, + }, + }, + } +} + +func BestEffortSleeperPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sleeper-be-pod", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + v1.Container{ + Name: "sleeper-be-cnt", + Image: CentosImage, + // 1 hour (or >= 1h in general) is "forever" for our purposes + Command: []string{"/bin/sleep", "1h"}, + }, + }, + }, + } +} + +func DeletePodsAsync(f *framework.Framework, podMap map[string]*v1.Pod) { + var wg sync.WaitGroup + for _, pod := range podMap { + wg.Add(1) + go func(podNS, podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + DeletePodSyncByName(f, podName) + }(pod.Namespace, pod.Name) + } + wg.Wait() +} + +func DeletePodSyncByName(f *framework.Framework, podName string) { + gp := int64(0) + delOpts := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout) +} + // NFDMasterPod provide NFD master pod definition func NFDMasterPod(image string, onMasterNode bool) *v1.Pod { p := &v1.Pod{ @@ -91,6 +169,12 @@ func NFDWorkerDaemonSet(image string, extraArgs []string) *appsv1.DaemonSet { return newDaemonSet("nfd-worker", podSpec) } +// NFDTopologyUpdaterDaemonSet provides the NFD daemon set topology updater +func NFDTopologyUpdaterDaemonSet(image string, extraArgs []string) *appsv1.DaemonSet { + podSpec := nfdTopologyUpdaterPodSpec(image, extraArgs) + return newDaemonSet("nfd-topology-updater", podSpec) +} + // newDaemonSet provide the new daemon set func newDaemonSet(name string, podSpec *v1.PodSpec) *appsv1.DaemonSet { return &appsv1.DaemonSet{ @@ -182,7 +266,89 @@ func nfdWorkerPodSpec(image string, extraArgs []string) *v1.PodSpec { }, }, } +} +func nfdTopologyUpdaterPodSpec(image string, extraArgs []string) *v1.PodSpec { + return &v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "node-topology-updater", + Image: image, + ImagePullPolicy: pullPolicy(), + Command: []string{"nfd-topology-updater"}, + Args: append([]string{ + "--kubelet-config-file=/podresources/config.yaml", + "--podresources-socket=unix:///podresources/kubelet.sock", + "--sleep-interval=3s", + "--watch-namespace=rte", + "--server=nfd-master-e2e:8080", + }, extraArgs...), + Env: []v1.EnvVar{ + { + Name: "NODE_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "spec.nodeName", + }, + }, + }, + }, + SecurityContext: &v1.SecurityContext{ + Capabilities: &v1.Capabilities{ + Drop: []v1.Capability{"ALL"}, + }, + RunAsUser: pointer.Int64Ptr(0), + ReadOnlyRootFilesystem: pointer.BoolPtr(true), + AllowPrivilegeEscalation: pointer.BoolPtr(false), + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "kubelet-podresources-conf", + MountPath: "/podresources/config.yaml", + }, + { + Name: "kubelet-podresources-sock", + MountPath: "/podresources/kubelet.sock", + }, + { + Name: "host-sys", + MountPath: "/host/sys", + }, + }, + }, + }, + ServiceAccountName: "nfd-master-e2e", + DNSPolicy: v1.DNSClusterFirstWithHostNet, + Volumes: []v1.Volume{ + { + Name: "kubelet-podresources-conf", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/lib/kubelet/config.yaml", + Type: newHostPathType(v1.HostPathFile), + }, + }, + }, + { + Name: "kubelet-podresources-sock", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/lib/kubelet/pod-resources/kubelet.sock", + Type: newHostPathType(v1.HostPathSocket), + }, + }, + }, + { + Name: "host-sys", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/sys", + Type: newHostPathType(v1.HostPathDirectory), + }, + }, + }, + }, + } } func newHostPathType(typ v1.HostPathType) *v1.HostPathType {