From 974829ff7c449b90d399e24ab588bf5d870e3b0d Mon Sep 17 00:00:00 2001 From: Tuomas Katila Date: Wed, 8 Mar 2023 13:59:23 +0200 Subject: [PATCH] gpu: try to fetch PodList from kubelet API In large clusters and with resource management, the load from gpu-plugins can become heavy for the api-server. This change will start fetching pod listings from kubelet and use api-server as a backup. Any other error than timeout will also move the logic back to using api-server. Signed-off-by: Tuomas Katila --- .../rm/gpu_plugin_resource_manager.go | 164 ++++++++++++++++-- .../rm/gpu_plugin_resource_manager_test.go | 19 ++ .../gpu_plugin/base/intel-gpu-plugin.yaml | 4 + .../add-kubelet-crt-mount.yaml | 17 ++ .../gpu-manager-role.yaml | 4 +- .../fractional_resources/kustomization.yaml | 1 + .../operator/rbac/gpu_manager_role.yaml | 7 + deployments/operator/rbac/role.yaml | 7 + pkg/controllers/gpu/controller.go | 2 + pkg/controllers/gpu/controller_test.go | 10 ++ pkg/controllers/reconciler.go | 1 + 11 files changed, 221 insertions(+), 15 deletions(-) create mode 100644 deployments/gpu_plugin/overlays/fractional_resources/add-kubelet-crt-mount.yaml diff --git a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go index 817dcad19..94a66968b 100644 --- a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go +++ b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go @@ -17,7 +17,13 @@ package rm import ( "context" "crypto/rand" + "crypto/tls" + "crypto/x509" + "encoding/json" + "io" "math/big" + "net" + "net/http" "os" "sort" "strconv" @@ -37,6 +43,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + "k8s.io/utils/strings/slices" ) const ( @@ -49,6 +56,13 @@ const ( grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock" grpcBufferSize = 4 * 1024 * 1024 grpcTimeout = 5 * time.Second + + kubeletAPITimeout = 5 * time.Second + kubeletAPIMaxRetries = 5 + kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt" + // This is detected incorrectly as credentials + //nolint:gosec + serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" ) // Errors. @@ -102,12 +116,14 @@ type resourceManager struct { prGetClientFunc getClientFunc assignments map[string]podAssignmentDetails // pod name -> assignment details nodeName string + hostIP string skipID string fullResourceName string retryTimeout time.Duration cleanupInterval time.Duration mutex sync.RWMutex // for devTree updates during scan cleanupMutex sync.RWMutex // for assignment details during cleanup + useKubelet bool } // NewDeviceInfo creates a new DeviceInfo. @@ -137,6 +153,7 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error rm := resourceManager{ nodeName: os.Getenv("NODE_NAME"), + hostIP: os.Getenv("HOST_IP"), clientset: clientset, skipID: skipID, fullResourceName: fullResourceName, @@ -144,10 +161,22 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error assignments: make(map[string]podAssignmentDetails), retryTimeout: 1 * time.Second, cleanupInterval: 20 * time.Minute, + useKubelet: true, } klog.Info("GPU device plugin resource manager enabled") + // Try listing Pods once to detect if Kubelet API works + _, err = rm.listPodsFromKubelet() + + if err != nil { + klog.V(2).Info("Not using Kubelet API") + + rm.useKubelet = false + } else { + klog.V(2).Info("Using Kubelet API") + } + go func() { getRandDuration := func() time.Duration { cleanupIntervalSeconds := int(rm.cleanupInterval.Seconds()) @@ -167,10 +196,7 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error // Gather both running and pending pods. It might happen that // cleanup is triggered between GetPreferredAllocation and Allocate // and it would remove the assignment data for the soon-to-be allocated pod - running := rm.listPodsOnNodeWithState(string(v1.PodRunning)) - for podName, podItem := range rm.listPodsOnNodeWithState(string(v1.PodPending)) { - running[podName] = podItem - } + running := rm.listPodsOnNodeWithStates([]string{string(v1.PodRunning), string(v1.PodPending)}) func() { rm.cleanupMutex.Lock() @@ -201,20 +227,129 @@ func getPodResourceKey(res *podresourcesv1.PodResources) string { return res.Namespace + "&" + res.Name } -func (rm *resourceManager) listPodsOnNodeWithState(state string) map[string]*v1.Pod { - pods := make(map[string]*v1.Pod) - - selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName + - ",status.phase=" + state) +func (rm *resourceManager) listPodsFromAPIServer() (*v1.PodList, error) { + selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName) if err != nil { - return pods + return &v1.PodList{}, err } + klog.V(4).Info("Requesting pods from API server") + podList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{ FieldSelector: selector.String(), }) + if err != nil { + klog.Error("pod listing failed:", err) + + if err != nil { + return &v1.PodList{}, err + } + } + + return podList, nil +} + +// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get + +func (rm *resourceManager) listPodsFromKubelet() (*v1.PodList, error) { + var podList v1.PodList + + token, err := os.ReadFile(serviceAccountTokenPath) + if err != nil { + klog.Warning("Failed to read token for kubelet API access: ", err) + + return &podList, err + } + + kubeletCert, err := os.ReadFile(kubeletHTTPSCertPath) + if err != nil { + klog.Warning("Failed to read kubelet cert: ", err) + + return &podList, err + } + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(kubeletCert) + + // There isn't an official documentation for the kubelet API. There is a blog post: + // https://www.deepnetwork.com/blog/2020/01/13/kubelet-api.html + // And a tool to work with the API: + // https://github.com/cyberark/kubeletctl + + kubeletURL := "https://" + rm.hostIP + ":10250/pods" + req, _ := http.NewRequestWithContext(context.Background(), "GET", kubeletURL, nil) + req.Header.Set("Authorization", "Bearer "+string(token)) + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: certPool, + ServerName: rm.nodeName, + }, + } + client := &http.Client{ + Timeout: kubeletAPITimeout, + Transport: tr, + } + + klog.V(4).Infof("Requesting pods from kubelet (%s)", kubeletURL) + + resp, err := (*client).Do(req) + if err != nil { + klog.Warning("Failed to read pods from kubelet API: ", err) + + return &podList, err + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + klog.Warning("Failed to read http response body: ", err) + + return &podList, err + } + + resp.Body.Close() + + err = json.Unmarshal(body, &podList) + if err != nil { + klog.Warning("Failed to unmarshal PodList from response: ", err) + + return &podList, err + } + + return &podList, nil +} + +func (rm *resourceManager) listPods() (*v1.PodList, error) { + // Try to use kubelet API as long as it provides listings within retries + if rm.useKubelet { + var neterr net.Error + + for i := 0; i < kubeletAPIMaxRetries; i++ { + if podList, err := rm.listPodsFromKubelet(); err == nil { + return podList, nil + } else if errors.As(err, neterr); neterr.Timeout() { + continue + } + + // If error is non-timeout, break to stop using kubelet API + break + } + + klog.Warning("Stopping Kubelet API use due to error/timeout") + + rm.useKubelet = false + } + + return rm.listPodsFromAPIServer() +} + +func (rm *resourceManager) listPodsOnNodeWithStates(states []string) map[string]*v1.Pod { + pods := make(map[string]*v1.Pod) + + podList, err := rm.listPods() if err != nil { klog.Error("pod listing failed:", err) @@ -222,8 +357,11 @@ func (rm *resourceManager) listPodsOnNodeWithState(state string) map[string]*v1. } for i := range podList.Items { - key := getPodKey(&podList.Items[i]) - pods[key] = &podList.Items[i] + phase := string(podList.Items[i].Status.Phase) + if slices.Contains(states, phase) { + key := getPodKey(&podList.Items[i]) + pods[key] = &podList.Items[i] + } } return pods @@ -528,7 +666,7 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) { // getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu. func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) { - pendingPods := rm.listPodsOnNodeWithState(string(v1.PodPending)) + pendingPods := rm.listPodsOnNodeWithStates([]string{string(v1.PodPending)}) for podName, pod := range pendingPods { if numGPUUsingContainers(pod, rm.fullResourceName) == 0 { diff --git a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go index 5e161e3d2..cc5e6f542 100644 --- a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go +++ b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go @@ -105,6 +105,7 @@ func newMockResourceManager(pods []v1.Pod) ResourceManager { fullResourceName: "gpu.intel.com/i915", assignments: make(map[string]podAssignmentDetails), retryTimeout: 1 * time.Millisecond, + useKubelet: false, } deviceInfoMap := NewDeviceInfoMap() @@ -168,6 +169,9 @@ func TestGetPreferredFractionalAllocation(t *testing.T) { }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } gpuLessTestPod := v1.Pod{ @@ -326,6 +330,9 @@ func TestCreateFractionalResourceResponse(t *testing.T) { }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } unAnnotatedTestPod := *properTestPod.DeepCopy() unAnnotatedTestPod.ObjectMeta.Annotations = nil @@ -458,6 +465,9 @@ func TestCreateFractionalResourceResponseWithOneCardTwoTiles(t *testing.T) { }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{ @@ -521,6 +531,9 @@ func TestCreateFractionalResourceResponseWithTwoCardsOneTile(t *testing.T) { }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{ @@ -589,6 +602,9 @@ func TestCreateFractionalResourceResponseWithThreeCardsTwoTiles(t *testing.T) { }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{ @@ -664,6 +680,9 @@ func TestCreateFractionalResourceResponseWithMultipleContainersTileEach(t *testi }, }, }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, } properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{ diff --git a/deployments/gpu_plugin/base/intel-gpu-plugin.yaml b/deployments/gpu_plugin/base/intel-gpu-plugin.yaml index 425479600..94180d405 100644 --- a/deployments/gpu_plugin/base/intel-gpu-plugin.yaml +++ b/deployments/gpu_plugin/base/intel-gpu-plugin.yaml @@ -32,6 +32,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP image: intel/intel-gpu-plugin:devel imagePullPolicy: IfNotPresent securityContext: diff --git a/deployments/gpu_plugin/overlays/fractional_resources/add-kubelet-crt-mount.yaml b/deployments/gpu_plugin/overlays/fractional_resources/add-kubelet-crt-mount.yaml new file mode 100644 index 000000000..ff4afafa8 --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/add-kubelet-crt-mount.yaml @@ -0,0 +1,17 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: intel-gpu-plugin +spec: + template: + spec: + containers: + - name: intel-gpu-plugin + volumeMounts: + - name: kubeletcrt + mountPath: /var/lib/kubelet/pki/kubelet.crt + volumes: + - name: kubeletcrt + hostPath: + path: /var/lib/kubelet/pki/kubelet.crt + type: FileOrCreate diff --git a/deployments/gpu_plugin/overlays/fractional_resources/gpu-manager-role.yaml b/deployments/gpu_plugin/overlays/fractional_resources/gpu-manager-role.yaml index 17bca25fa..61db88233 100644 --- a/deployments/gpu_plugin/overlays/fractional_resources/gpu-manager-role.yaml +++ b/deployments/gpu_plugin/overlays/fractional_resources/gpu-manager-role.yaml @@ -4,5 +4,5 @@ metadata: name: gpu-manager-role rules: - apiGroups: [""] - resources: ["pods"] - verbs: ["list"] + resources: ["pods", "nodes/proxy"] + verbs: ["list", "get"] diff --git a/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml b/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml index 702053d01..85d0d920f 100644 --- a/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml +++ b/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml @@ -9,3 +9,4 @@ patches: - path: add-podresource-mount.yaml - path: add-args.yaml - path: add-nodeselector-intel-gpu.yaml + - path: add-kubelet-crt-mount.yaml diff --git a/deployments/operator/rbac/gpu_manager_role.yaml b/deployments/operator/rbac/gpu_manager_role.yaml index 691a47290..8c6790913 100644 --- a/deployments/operator/rbac/gpu_manager_role.yaml +++ b/deployments/operator/rbac/gpu_manager_role.yaml @@ -5,6 +5,13 @@ metadata: creationTimestamp: null name: gpu-manager-role rules: +- apiGroups: + - "" + resources: + - nodes/proxy + verbs: + - get + - list - apiGroups: - "" resources: diff --git a/deployments/operator/rbac/role.yaml b/deployments/operator/rbac/role.yaml index 14495bc98..2c18d55d1 100644 --- a/deployments/operator/rbac/role.yaml +++ b/deployments/operator/rbac/role.yaml @@ -5,6 +5,13 @@ metadata: creationTimestamp: null name: manager-role rules: +- apiGroups: + - "" + resources: + - nodes/proxy + verbs: + - get + - list - apiGroups: - "" resources: diff --git a/pkg/controllers/gpu/controller.go b/pkg/controllers/gpu/controller.go index 79c59b3e3..fa8ac6178 100644 --- a/pkg/controllers/gpu/controller.go +++ b/pkg/controllers/gpu/controller.go @@ -152,6 +152,8 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet { daemonSet.Spec.Template.Spec.ServiceAccountName = serviceAccountName addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources", v1.HostPathDirectory) addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources") + addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt", v1.HostPathFileOrCreate) + addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt") } return daemonSet diff --git a/pkg/controllers/gpu/controller_test.go b/pkg/controllers/gpu/controller_test.go index 622fa54d5..9d4db1f04 100644 --- a/pkg/controllers/gpu/controller_test.go +++ b/pkg/controllers/gpu/controller_test.go @@ -74,6 +74,14 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet }, }, }, + { + Name: "HOST_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, }, Args: getPodArgs(devicePlugin), Image: devicePlugin.Spec.Image, @@ -145,6 +153,8 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet daemonSet.Spec.Template.Spec.ServiceAccountName = serviceAccountName addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources", v1.HostPathDirectory) addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources") + addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt", v1.HostPathFileOrCreate) + addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt") } return &daemonSet diff --git a/pkg/controllers/reconciler.go b/pkg/controllers/reconciler.go index 52e2f1b4e..267e99c24 100644 --- a/pkg/controllers/reconciler.go +++ b/pkg/controllers/reconciler.go @@ -73,6 +73,7 @@ func GetDevicePluginCount(pluginKind string) int { // +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;delete // +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;watch;create;delete // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=get;list // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=create // +kubebuilder:rbac:groups=security.openshift.io,resources=securitycontextconstraints,resourceNames=privileged,verbs=use // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,resourceNames=d1c7b6d5.intel.com,verbs=get;update