diff --git a/cluster-autoscaler/simulator/basic_cluster_snapshot.go b/cluster-autoscaler/simulator/basic_cluster_snapshot.go index 1c8b30a83dfb..94eca6b38e1b 100644 --- a/cluster-autoscaler/simulator/basic_cluster_snapshot.go +++ b/cluster-autoscaler/simulator/basic_cluster_snapshot.go @@ -31,7 +31,8 @@ type BasicClusterSnapshot struct { } type internalBasicSnapshotData struct { - nodeInfoMap map[string]*schedulerframework.NodeInfo + nodeInfoMap map[string]*schedulerframework.NodeInfo + pvcNamespacePodMap map[string]map[string]bool } func (data *internalBasicSnapshotData) listNodeInfos() ([]*schedulerframework.NodeInfo, error) { @@ -71,9 +72,52 @@ func (data *internalBasicSnapshotData) getNodeInfo(nodeName string) (*schedulerf return nil, errNodeNotFound } +func (data *internalBasicSnapshotData) isPVCUsedByPods(key string) bool { + if v, found := data.pvcNamespacePodMap[key]; found && v != nil && len(v) > 0 { + return true + } + return false +} + +func (data *internalBasicSnapshotData) addPvcUsedByPod(pod *apiv1.Pod) { + if pod == nil { + return + } + nameSpace := pod.GetNamespace() + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + k := schedulerframework.GetNamespacedName(nameSpace, volume.PersistentVolumeClaim.ClaimName) + _, found := data.pvcNamespacePodMap[k] + if !found { + data.pvcNamespacePodMap[k] = make(map[string]bool) + } + data.pvcNamespacePodMap[k][pod.GetName()] = true + } +} + +func (data *internalBasicSnapshotData) removePvcUsedByPod(pod *apiv1.Pod) { + if pod == nil { + return + } + + nameSpace := pod.GetNamespace() + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + k := schedulerframework.GetNamespacedName(nameSpace, volume.PersistentVolumeClaim.ClaimName) + if _, found := data.pvcNamespacePodMap[k]; found { + delete(data.pvcNamespacePodMap[k], pod.GetName()) + } + } +} + func newInternalBasicSnapshotData() *internalBasicSnapshotData { return &internalBasicSnapshotData{ - nodeInfoMap: make(map[string]*schedulerframework.NodeInfo), + nodeInfoMap: make(map[string]*schedulerframework.NodeInfo), + pvcNamespacePodMap: make(map[string]map[string]bool), } } @@ -82,8 +126,16 @@ func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData { for k, v := range data.nodeInfoMap { clonedNodeInfoMap[k] = v.Clone() } + clonedPvcNamespaceNodeMap := make(map[string]map[string]bool) + for k, v := range data.pvcNamespacePodMap { + clonedPvcNamespaceNodeMap[k] = make(map[string]bool) + for k1, v1 := range v { + clonedPvcNamespaceNodeMap[k][k1] = v1 + } + } return &internalBasicSnapshotData{ - nodeInfoMap: clonedNodeInfoMap, + nodeInfoMap: clonedNodeInfoMap, + pvcNamespacePodMap: clonedPvcNamespaceNodeMap, } } @@ -110,6 +162,9 @@ func (data *internalBasicSnapshotData) removeNode(nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { return errNodeNotFound } + for _, pod := range data.nodeInfoMap[nodeName].Pods { + data.removePvcUsedByPod(pod.Pod) + } delete(data.nodeInfoMap, nodeName) return nil } @@ -119,6 +174,7 @@ func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e return errNodeNotFound } data.nodeInfoMap[nodeName].AddPod(pod) + data.addPvcUsedByPod(pod) return nil } @@ -129,8 +185,10 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st } for _, podInfo := range nodeInfo.Pods { if podInfo.Pod.Namespace == namespace && podInfo.Pod.Name == podName { + data.removePvcUsedByPod(podInfo.Pod) err := nodeInfo.RemovePod(podInfo.Pod) if err != nil { + data.addPvcUsedByPod(podInfo.Pod) return fmt.Errorf("cannot remove pod; %v", err) } return nil @@ -191,6 +249,11 @@ func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName str return snapshot.getInternalData().removePod(namespace, podName, nodeName) } +// IsPVCUsedByPods returns if the pvc is used by any pod +func (snapshot *BasicClusterSnapshot) IsPVCUsedByPods(key string) bool { + return snapshot.getInternalData().isPVCUsedByPods(key) +} + // Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert() // Forking already forked snapshot is not allowed and will result with an error. func (snapshot *BasicClusterSnapshot) Fork() error { @@ -261,5 +324,5 @@ func (snapshot *basicClusterSnapshotNodeLister) Get(nodeName string) (*scheduler // Returns the IsPVCUsedByPods in a given key. func (snapshot *basicClusterSnapshotStorageLister) IsPVCUsedByPods(key string) bool { - return false + return (*BasicClusterSnapshot)(snapshot).getInternalData().isPVCUsedByPods(key) } diff --git a/cluster-autoscaler/simulator/cluster_snapshot.go b/cluster-autoscaler/simulator/cluster_snapshot.go index 41b71348495b..5ce91285dd46 100644 --- a/cluster-autoscaler/simulator/cluster_snapshot.go +++ b/cluster-autoscaler/simulator/cluster_snapshot.go @@ -39,6 +39,8 @@ type ClusterSnapshot interface { RemovePod(namespace string, podName string, nodeName string) error // AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error + // IsPVCUsedByPods returns if the pvc is used by any pod, key = / + IsPVCUsedByPods(key string) bool // Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert() // Forking already forked snapshot is not allowed and will result with an error. diff --git a/cluster-autoscaler/simulator/cluster_snapshot_test.go b/cluster-autoscaler/simulator/cluster_snapshot_test.go index 2c5ff3116063..b3ef5698a147 100644 --- a/cluster-autoscaler/simulator/cluster_snapshot_test.go +++ b/cluster-autoscaler/simulator/cluster_snapshot_test.go @@ -22,9 +22,8 @@ import ( "testing" "time" - . "k8s.io/autoscaler/cluster-autoscaler/utils/test" - apiv1 "k8s.io/api/core/v1" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/stretchr/testify/assert" @@ -464,3 +463,129 @@ func TestNodeAlreadyExists(t *testing.T) { } } } + +func TestPVCUsedByPods(t *testing.T) { + node := BuildTestNode("node", 10, 10) + pod1 := BuildTestPod("pod1", 10, 10) + pod1.Spec.NodeName = node.Name + pod1.Spec.Volumes = []apiv1.Volume{ + { + Name: "v1", + VolumeSource: apiv1.VolumeSource{ + PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{ + ClaimName: "claim1", + }, + }, + }, + } + pod2 := BuildTestPod("pod2", 10, 10) + pod2.Spec.NodeName = node.Name + pod2.Spec.Volumes = []apiv1.Volume{ + { + Name: "v1", + VolumeSource: apiv1.VolumeSource{ + PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{ + ClaimName: "claim1", + }, + }, + }, + { + Name: "v1", + VolumeSource: apiv1.VolumeSource{ + PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{ + ClaimName: "claim2", + }, + }, + }, + } + nonPvcPod := BuildTestPod("pod1", 10, 10) + nonPvcPod.Spec.NodeName = node.Name + nonPvcPod.Spec.Volumes = []apiv1.Volume{ + { + Name: "v1", + VolumeSource: apiv1.VolumeSource{ + NFS: &apiv1.NFSVolumeSource{ + Server: "", + Path: "", + ReadOnly: false, + }, + }, + }, + } + testcase := []struct { + desc string + node *apiv1.Node + pods []*apiv1.Pod + claimName string + exists bool + isRemoveCheck bool + removePod string + existsAfterRemove bool + }{ + { + desc: "pvc new pod with volume fetch", + node: node, + pods: []*apiv1.Pod{pod1}, + claimName: "claim1", + exists: true, + isRemoveCheck: false, + }, + { + desc: "pvc new pod with incorrect volume fetch", + node: node, + pods: []*apiv1.Pod{pod1}, + claimName: "incorrect-claim", + exists: false, + isRemoveCheck: false, + }, + { + desc: "new pod with non-pvc volume fetch", + node: node, + pods: []*apiv1.Pod{nonPvcPod}, + claimName: "incorrect-claim", + exists: false, + isRemoveCheck: false, + }, + { + desc: "pvc new pod with delete volume fetch", + node: node, + pods: []*apiv1.Pod{pod1}, + claimName: "claim1", + exists: true, + isRemoveCheck: true, + removePod: "pod1", + existsAfterRemove: false, + }, + { + desc: "pvc two pods with duplicated volume, delete one pod, fetch", + node: node, + pods: []*apiv1.Pod{pod1, pod2}, + claimName: "claim1", + exists: true, + isRemoveCheck: true, + removePod: "pod1", + existsAfterRemove: true, + }, + } + + for _, snapshotFactory := range snapshots { + for _, tc := range testcase { + t.Run(tc.desc, func(t *testing.T) { + snapshot := snapshotFactory() + err := snapshot.AddNodeWithPods(tc.node, tc.pods) + assert.NoError(t, err) + + volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + assert.Equal(t, tc.exists, volumeExists) + + if tc.isRemoveCheck { + err = snapshot.RemovePod("default", tc.removePod, "node") + assert.NoError(t, err) + + volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + assert.Equal(t, tc.existsAfterRemove, volumeExists) + } + }) + } + } +} diff --git a/cluster-autoscaler/simulator/delegating_shared_lister.go b/cluster-autoscaler/simulator/delegating_shared_lister.go index 23e782f0a081..92c263a8db27 100644 --- a/cluster-autoscaler/simulator/delegating_shared_lister.go +++ b/cluster-autoscaler/simulator/delegating_shared_lister.go @@ -40,6 +40,7 @@ func (lister *DelegatingSchedulerSharedLister) NodeInfos() schedulerframework.No return lister.delegate.NodeInfos() } +// StorageInfos returns a StorageInfoLister func (lister *DelegatingSchedulerSharedLister) StorageInfos() schedulerframework.StorageInfoLister { return lister.delegate.StorageInfos() } diff --git a/cluster-autoscaler/simulator/delta_cluster_snapshot.go b/cluster-autoscaler/simulator/delta_cluster_snapshot.go index 9b4d187c6f17..2d08434f6175 100644 --- a/cluster-autoscaler/simulator/delta_cluster_snapshot.go +++ b/cluster-autoscaler/simulator/delta_cluster_snapshot.go @@ -57,6 +57,7 @@ type internalDeltaSnapshotData struct { nodeInfoList []*schedulerframework.NodeInfo havePodsWithAffinity []*schedulerframework.NodeInfo havePodsWithRequiredAntiAffinity []*schedulerframework.NodeInfo + pvcNamespaceMap map[string]int } func newInternalDeltaSnapshotData() *internalDeltaSnapshotData { @@ -180,6 +181,8 @@ func (data *internalDeltaSnapshotData) clearCaches() { func (data *internalDeltaSnapshotData) clearPodCaches() { data.havePodsWithAffinity = nil data.havePodsWithRequiredAntiAffinity = nil + // TODO: update the cache when adding/removing pods instead of invalidating the whole cache + data.pvcNamespaceMap = nil } func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { @@ -273,6 +276,21 @@ func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName strin return nil } +func (data *internalDeltaSnapshotData) isPVCUsedByPods(key string) bool { + if data.pvcNamespaceMap != nil { + return data.pvcNamespaceMap[key] > 0 + } + nodeInfos := data.getNodeInfoList() + pvcNamespaceMap := make(map[string]int) + for _, v := range nodeInfos { + for k, i := range v.PVCRefCounts { + pvcNamespaceMap[k] += i + } + } + data.pvcNamespaceMap = pvcNamespaceMap + return data.pvcNamespaceMap[key] > 0 +} + func (data *internalDeltaSnapshotData) fork() *internalDeltaSnapshotData { forkedData := newInternalDeltaSnapshotData() forkedData.baseData = data @@ -353,7 +371,7 @@ func (snapshot *deltaSnapshotNodeLister) Get(nodeName string) (*schedulerframewo // IsPVCUsedByPods returns if PVC is used by pods func (snapshot *deltaSnapshotStorageLister) IsPVCUsedByPods(key string) bool { - return false + return (*DeltaClusterSnapshot)(snapshot).IsPVCUsedByPods(key) } func (snapshot *DeltaClusterSnapshot) getNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error) { @@ -420,6 +438,11 @@ func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName str return snapshot.data.removePod(namespace, podName, nodeName) } +// IsPVCUsedByPods returns if the pvc is used by any pod +func (snapshot *DeltaClusterSnapshot) IsPVCUsedByPods(key string) bool { + return snapshot.data.isPVCUsedByPods(key) +} + // Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert() // Forking already forked snapshot is not allowed and will result with an error. // Time: O(1)