Skip to content

Commit

Permalink
add support for isPVC check in basic_cluster_snapshot.go and delta_cl…
Browse files Browse the repository at this point in the history
…uster_snapshot.go

- added a comment in delegating_shared_lister.go
  • Loading branch information
jayantjain93 committed Jul 14, 2022
1 parent f508ee1 commit 746de9b
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 7 deletions.
71 changes: 67 additions & 4 deletions cluster-autoscaler/simulator/basic_cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type BasicClusterSnapshot struct {
}

type internalBasicSnapshotData struct {
nodeInfoMap map[string]*schedulerframework.NodeInfo
nodeInfoMap map[string]*schedulerframework.NodeInfo
pvcNamespacePodMap map[string]map[string]bool
}

func (data *internalBasicSnapshotData) listNodeInfos() ([]*schedulerframework.NodeInfo, error) {
Expand Down Expand Up @@ -71,9 +72,52 @@ func (data *internalBasicSnapshotData) getNodeInfo(nodeName string) (*schedulerf
return nil, errNodeNotFound
}

func (data *internalBasicSnapshotData) isPVCUsedByPods(key string) bool {
if v, found := data.pvcNamespacePodMap[key]; found && v != nil && len(v) > 0 {
return true
}
return false
}

func (data *internalBasicSnapshotData) addPvcUsedByPod(pod *apiv1.Pod) {
if pod == nil {
return
}
nameSpace := pod.GetNamespace()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
k := schedulerframework.GetNamespacedName(nameSpace, volume.PersistentVolumeClaim.ClaimName)
_, found := data.pvcNamespacePodMap[k]
if !found {
data.pvcNamespacePodMap[k] = make(map[string]bool)
}
data.pvcNamespacePodMap[k][pod.GetName()] = true
}
}

func (data *internalBasicSnapshotData) removePvcUsedByPod(pod *apiv1.Pod) {
if pod == nil {
return
}

nameSpace := pod.GetNamespace()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
k := schedulerframework.GetNamespacedName(nameSpace, volume.PersistentVolumeClaim.ClaimName)
if _, found := data.pvcNamespacePodMap[k]; found {
delete(data.pvcNamespacePodMap[k], pod.GetName())
}
}
}

func newInternalBasicSnapshotData() *internalBasicSnapshotData {
return &internalBasicSnapshotData{
nodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
nodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
pvcNamespacePodMap: make(map[string]map[string]bool),
}
}

Expand All @@ -82,8 +126,16 @@ func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData {
for k, v := range data.nodeInfoMap {
clonedNodeInfoMap[k] = v.Clone()
}
clonedPvcNamespaceNodeMap := make(map[string]map[string]bool)
for k, v := range data.pvcNamespacePodMap {
clonedPvcNamespaceNodeMap[k] = make(map[string]bool)
for k1, v1 := range v {
clonedPvcNamespaceNodeMap[k][k1] = v1
}
}
return &internalBasicSnapshotData{
nodeInfoMap: clonedNodeInfoMap,
nodeInfoMap: clonedNodeInfoMap,
pvcNamespacePodMap: clonedPvcNamespaceNodeMap,
}
}

Expand All @@ -110,6 +162,9 @@ func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return errNodeNotFound
}
for _, pod := range data.nodeInfoMap[nodeName].Pods {
data.removePvcUsedByPod(pod.Pod)
}
delete(data.nodeInfoMap, nodeName)
return nil
}
Expand All @@ -119,6 +174,7 @@ func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return errNodeNotFound
}
data.nodeInfoMap[nodeName].AddPod(pod)
data.addPvcUsedByPod(pod)
return nil
}

Expand All @@ -129,8 +185,10 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st
}
for _, podInfo := range nodeInfo.Pods {
if podInfo.Pod.Namespace == namespace && podInfo.Pod.Name == podName {
data.removePvcUsedByPod(podInfo.Pod)
err := nodeInfo.RemovePod(podInfo.Pod)
if err != nil {
data.addPvcUsedByPod(podInfo.Pod)
return fmt.Errorf("cannot remove pod; %v", err)
}
return nil
Expand Down Expand Up @@ -191,6 +249,11 @@ func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName str
return snapshot.getInternalData().removePod(namespace, podName, nodeName)
}

// IsPVCUsedByPods returns if the pvc is used by any pod
func (snapshot *BasicClusterSnapshot) IsPVCUsedByPods(key string) bool {
return snapshot.getInternalData().isPVCUsedByPods(key)
}

// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
func (snapshot *BasicClusterSnapshot) Fork() error {
Expand Down Expand Up @@ -261,5 +324,5 @@ func (snapshot *basicClusterSnapshotNodeLister) Get(nodeName string) (*scheduler

// Returns the IsPVCUsedByPods in a given key.
func (snapshot *basicClusterSnapshotStorageLister) IsPVCUsedByPods(key string) bool {
return false
return (*BasicClusterSnapshot)(snapshot).getInternalData().isPVCUsedByPods(key)
}
2 changes: 2 additions & 0 deletions cluster-autoscaler/simulator/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type ClusterSnapshot interface {
RemovePod(namespace string, podName string, nodeName string) error
// AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot.
AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error
// IsPVCUsedByPods returns if the pvc is used by any pod, key = <namespace>/<pvc_name>
IsPVCUsedByPods(key string) bool

// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
Expand Down
129 changes: 127 additions & 2 deletions cluster-autoscaler/simulator/cluster_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
"testing"
"time"

. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -464,3 +463,129 @@ func TestNodeAlreadyExists(t *testing.T) {
}
}
}

func TestPVCUsedByPods(t *testing.T) {
node := BuildTestNode("node", 10, 10)
pod1 := BuildTestPod("pod1", 10, 10)
pod1.Spec.NodeName = node.Name
pod1.Spec.Volumes = []apiv1.Volume{
{
Name: "v1",
VolumeSource: apiv1.VolumeSource{
PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{
ClaimName: "claim1",
},
},
},
}
pod2 := BuildTestPod("pod2", 10, 10)
pod2.Spec.NodeName = node.Name
pod2.Spec.Volumes = []apiv1.Volume{
{
Name: "v1",
VolumeSource: apiv1.VolumeSource{
PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{
ClaimName: "claim1",
},
},
},
{
Name: "v1",
VolumeSource: apiv1.VolumeSource{
PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{
ClaimName: "claim2",
},
},
},
}
nonPvcPod := BuildTestPod("pod1", 10, 10)
nonPvcPod.Spec.NodeName = node.Name
nonPvcPod.Spec.Volumes = []apiv1.Volume{
{
Name: "v1",
VolumeSource: apiv1.VolumeSource{
NFS: &apiv1.NFSVolumeSource{
Server: "",
Path: "",
ReadOnly: false,
},
},
},
}
testcase := []struct {
desc string
node *apiv1.Node
pods []*apiv1.Pod
claimName string
exists bool
isRemoveCheck bool
removePod string
existsAfterRemove bool
}{
{
desc: "pvc new pod with volume fetch",
node: node,
pods: []*apiv1.Pod{pod1},
claimName: "claim1",
exists: true,
isRemoveCheck: false,
},
{
desc: "pvc new pod with incorrect volume fetch",
node: node,
pods: []*apiv1.Pod{pod1},
claimName: "incorrect-claim",
exists: false,
isRemoveCheck: false,
},
{
desc: "new pod with non-pvc volume fetch",
node: node,
pods: []*apiv1.Pod{nonPvcPod},
claimName: "incorrect-claim",
exists: false,
isRemoveCheck: false,
},
{
desc: "pvc new pod with delete volume fetch",
node: node,
pods: []*apiv1.Pod{pod1},
claimName: "claim1",
exists: true,
isRemoveCheck: true,
removePod: "pod1",
existsAfterRemove: false,
},
{
desc: "pvc two pods with duplicated volume, delete one pod, fetch",
node: node,
pods: []*apiv1.Pod{pod1, pod2},
claimName: "claim1",
exists: true,
isRemoveCheck: true,
removePod: "pod1",
existsAfterRemove: true,
},
}

for _, snapshotFactory := range snapshots {
for _, tc := range testcase {
t.Run(tc.desc, func(t *testing.T) {
snapshot := snapshotFactory()
err := snapshot.AddNodeWithPods(tc.node, tc.pods)
assert.NoError(t, err)

volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
assert.Equal(t, tc.exists, volumeExists)

if tc.isRemoveCheck {
err = snapshot.RemovePod("default", tc.removePod, "node")
assert.NoError(t, err)

volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
assert.Equal(t, tc.existsAfterRemove, volumeExists)
}
})
}
}
}
1 change: 1 addition & 0 deletions cluster-autoscaler/simulator/delegating_shared_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (lister *DelegatingSchedulerSharedLister) NodeInfos() schedulerframework.No
return lister.delegate.NodeInfos()
}

// StorageInfos returns a StorageInfoLister
func (lister *DelegatingSchedulerSharedLister) StorageInfos() schedulerframework.StorageInfoLister {
return lister.delegate.StorageInfos()
}
Expand Down
25 changes: 24 additions & 1 deletion cluster-autoscaler/simulator/delta_cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type internalDeltaSnapshotData struct {
nodeInfoList []*schedulerframework.NodeInfo
havePodsWithAffinity []*schedulerframework.NodeInfo
havePodsWithRequiredAntiAffinity []*schedulerframework.NodeInfo
pvcNamespaceMap map[string]int
}

func newInternalDeltaSnapshotData() *internalDeltaSnapshotData {
Expand Down Expand Up @@ -180,6 +181,8 @@ func (data *internalDeltaSnapshotData) clearCaches() {
func (data *internalDeltaSnapshotData) clearPodCaches() {
data.havePodsWithAffinity = nil
data.havePodsWithRequiredAntiAffinity = nil
// TODO: update the cache when adding/removing pods instead of invalidating the whole cache
data.pvcNamespaceMap = nil
}

func (data *internalDeltaSnapshotData) removeNode(nodeName string) error {
Expand Down Expand Up @@ -273,6 +276,21 @@ func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName strin
return nil
}

func (data *internalDeltaSnapshotData) isPVCUsedByPods(key string) bool {
if data.pvcNamespaceMap != nil {
return data.pvcNamespaceMap[key] > 0
}
nodeInfos := data.getNodeInfoList()
pvcNamespaceMap := make(map[string]int)
for _, v := range nodeInfos {
for k, i := range v.PVCRefCounts {
pvcNamespaceMap[k] += i
}
}
data.pvcNamespaceMap = pvcNamespaceMap
return data.pvcNamespaceMap[key] > 0
}

func (data *internalDeltaSnapshotData) fork() *internalDeltaSnapshotData {
forkedData := newInternalDeltaSnapshotData()
forkedData.baseData = data
Expand Down Expand Up @@ -353,7 +371,7 @@ func (snapshot *deltaSnapshotNodeLister) Get(nodeName string) (*schedulerframewo

// IsPVCUsedByPods returns if PVC is used by pods
func (snapshot *deltaSnapshotStorageLister) IsPVCUsedByPods(key string) bool {
return false
return (*DeltaClusterSnapshot)(snapshot).IsPVCUsedByPods(key)
}

func (snapshot *DeltaClusterSnapshot) getNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error) {
Expand Down Expand Up @@ -420,6 +438,11 @@ func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName str
return snapshot.data.removePod(namespace, podName, nodeName)
}

// IsPVCUsedByPods returns if the pvc is used by any pod
func (snapshot *DeltaClusterSnapshot) IsPVCUsedByPods(key string) bool {
return snapshot.data.isPVCUsedByPods(key)
}

// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
// Time: O(1)
Expand Down

0 comments on commit 746de9b

Please sign in to comment.