From 6243f97fd8713ece964ae6385635e43840f2b1ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 18:14:36 +0100 Subject: [PATCH] CA: add DRA object handling logic to PredicateSnapshot All added logic is behind the DRA flag guard, this should be a no-op if the flag is disabled. --- .../predicate/predicate_snapshot.go | 117 +++++++++++++++++- 1 file changed, 114 insertions(+), 3 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 8e9c39c20074..082ea665d4d2 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -17,9 +17,13 @@ limitations under the License. package predicate import ( + "fmt" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + drautils "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/utils" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) // PredicateSnapshot implements ClusterSnapshot on top of a SnapshotBase by using @@ -46,6 +50,9 @@ func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, e if err != nil { return nil, err } + if s.draEnabled { + return s.SnapshotBase.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo) + } return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil } @@ -57,26 +64,69 @@ func (s *PredicateSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) { } var result []*framework.NodeInfo for _, schedNodeInfo := range schedNodeInfos { - result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) + if s.draEnabled { + nodeInfo, err := s.SnapshotBase.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo) + if err != nil { + return nil, err + } + result = append(result, nodeInfo) + } else { + result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) + } } return result, nil } // AddNodeInfo adds the provided internal NodeInfo to the snapshot. func (s *PredicateSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error { + if s.draEnabled && len(nodeInfo.LocalResourceSlices) > 0 { + err := s.SnapshotBase.DraSnapshot().AddNodeResourceSlices(nodeInfo.Node().Name, nodeInfo.LocalResourceSlices) + if err != nil { + return fmt.Errorf("couldn't add ResourceSlices to DRA snapshot: %v", err) + } + + for _, podInfo := range nodeInfo.Pods() { + err := s.SnapshotBase.DraSnapshot().AddClaims(podInfo.NeededResourceClaims) + if err != nil { + return fmt.Errorf("couldn't add ResourceSlices to DRA snapshot: %v", err) + } + } + } + return s.SnapshotBase.AddSchedulerNodeInfo(nodeInfo.ToScheduler()) } // RemoveNodeInfo removes a NodeInfo matching the provided nodeName from the snapshot. func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { + if s.draEnabled { + nodeInfo, err := s.GetNodeInfo(nodeName) + if err != nil { + return err + } + + s.SnapshotBase.DraSnapshot().RemoveNodeResourceSlices(nodeName) + + for _, pod := range nodeInfo.Pods() { + s.SnapshotBase.DraSnapshot().RemovePodClaims(pod.Pod) + } + } + return s.SnapshotBase.RemoveSchedulerNodeInfo(nodeName) } // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { + node, cycleState, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName) + if schedErr != nil { return schedErr } + + if s.draEnabled { + if err := s.handleResourceClaimModifications(pod, node, cycleState); err != nil { + return clustersnapshot.NewSchedulingInternalError(pod, err.Error()) + } + } + if err := s.ForceAddPod(pod, nodeName); err != nil { return clustersnapshot.NewSchedulingInternalError(pod, err.Error()) } @@ -85,10 +135,17 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) + node, cycleState, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } + + if s.draEnabled { + if err := s.handleResourceClaimModifications(pod, node, cycleState); err != nil { + return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error()) + } + } + if err := s.ForceAddPod(pod, node.Name); err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error()) } @@ -97,6 +154,28 @@ func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNode // UnschedulePod removes the given Pod from the given Node inside the snapshot. func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, nodeName string) error { + if s.draEnabled { + nodeInfo, err := s.GetNodeInfo(nodeName) + if err != nil { + return err + } + + var foundPod *apiv1.Pod + for _, pod := range nodeInfo.Pods() { + if pod.Namespace == namespace && pod.Name == podName { + foundPod = pod.Pod + break + } + } + if foundPod == nil { + return fmt.Errorf("pod %s/%s not found on node %s", namespace, podName, nodeName) + } + + if err := s.SnapshotBase.DraSnapshot().UnreservePodClaims(foundPod); err != nil { + return err + } + } + return s.ForceRemovePod(namespace, podName, nodeName) } @@ -105,3 +184,35 @@ func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clu _, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName) return err } + +func (s *PredicateSnapshot) handleResourceClaimModifications(pod *apiv1.Pod, node *apiv1.Node, postFilterState *schedulerframework.CycleState) error { + if len(pod.Spec.ResourceClaims) == 0 { + return nil + } + // We need to run the scheduler Reserve phase to allocate the appropriate ResourceClaims in the DRA snapshot. The allocations are + // actually computed and cached in the Filter phase, and Reserve only grabs them from the cycle state. So this should be quick, but + // it needs the cycle state from after running the Filter phase. + err := s.pluginRunner.RunReserveOnNode(pod, node.Name, postFilterState) + if err != nil { + return fmt.Errorf("error while trying to run Reserve node %s for pod %s/%s: %v", node.Name, pod.Namespace, pod.Name, err) + } + + // The pod isn't added to the ReservedFor field of the claim during the Reserve phase (it happens later, in PreBind). We can just do it + // manually here. It shouldn't fail, it only fails if ReservedFor is at max length already, but that is checked during the Filter phase. + err = s.SnapshotBase.DraSnapshot().ReservePodClaims(pod) + if err != nil { + return fmt.Errorf("couldnn't add pod reservations to claims, this shouldn't happen: %v", err) + } + + // Verify that all needed claims are tracked in the DRA snapshot, allocated, and available on the Node. + claims, err := s.SnapshotBase.DraSnapshot().PodClaims(pod) + if err != nil { + return fmt.Errorf("couldn't obtain pod %s/%s claims: %v", pod.Namespace, pod.Name, err) + } + for _, claim := range claims { + if available, err := drautils.ClaimAvailableOnNode(claim, node); err != nil || !available { + return fmt.Errorf("pod %s/%s needs claim %s to schedule, but it isn't available on node %s (allocated: %v, available: %v, err: %v)", pod.Namespace, pod.Name, claim.Name, node.Name, drautils.ClaimAllocated(claim), available, err) + } + } + return nil +}