Skip to content

Commit

Permalink
Add Delete Volume Finalizer
Browse files Browse the repository at this point in the history
This PR adds Finalizer on the snapshot source PVC to prevent it
from being deleted when a snapshot is being created from it.
  • Loading branch information
xing-yang committed Apr 26, 2019
1 parent ac33959 commit a36f9c8
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 6 deletions.
2 changes: 1 addition & 1 deletion deploy/kubernetes/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
Expand Down
171 changes: 168 additions & 3 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"reflect"
sysruntime "runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/slice"
)

// This is a unit test framework for snapshot controller.
Expand Down Expand Up @@ -110,7 +112,8 @@ type controllerTest struct {
// List of expected CSI list snapshot calls
expectedListCalls []listCall
// Function to call as the test.
test testCall
test testCall
expectSuccess bool
}

type testCall func(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error
Expand Down Expand Up @@ -177,6 +180,11 @@ func withContentFinalizer(content *crdv1.VolumeSnapshotContent) *crdv1.VolumeSna
return content
}

func withPVCFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, PVCFinalizer)
return pvc
}

// React is a callback called by fake kubeClient from the controller.
// In other words, every snapshot/content change performed by the controller ends
// here.
Expand Down Expand Up @@ -331,6 +339,32 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, fmt.Errorf("cannot find claim %s", name)

case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)

// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, errVersionConflict
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("cannot update claim %s: claim not found", claim.Name)
}

// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil

case action.Matches("get", "storageclasses"):
name := action.(core.GetAction).GetName()
storageClass, found := r.storageClasses[name]
Expand Down Expand Up @@ -550,6 +584,9 @@ func (r *snapshotReactor) syncAll() {
for _, v := range r.contents {
r.changedObjects = append(r.changedObjects, v)
}
for _, pvc := range r.claims {
r.changedObjects = append(r.changedObjects, pvc)
}
r.changedSinceLastSync = 0
}

Expand Down Expand Up @@ -699,6 +736,7 @@ func newSnapshotReactor(kubeClient *kubefake.Clientset, client *fake.Clientset,
client.AddReactor("delete", "volumesnapshotcontents", reactor.React)
client.AddReactor("delete", "volumesnapshots", reactor.React)
kubeClient.AddReactor("get", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("update", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("get", "persistentvolumes", reactor.React)
kubeClient.AddReactor("get", "storageclasses", reactor.React)
kubeClient.AddReactor("get", "secrets", reactor.React)
Expand Down Expand Up @@ -746,6 +784,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
ctrl.contentListerSynced = alwaysReady
ctrl.snapshotListerSynced = alwaysReady
ctrl.classListerSynced = alwaysReady
ctrl.pvcListerSynced = alwaysReady

return ctrl, nil
}
Expand Down Expand Up @@ -845,7 +884,7 @@ func newSnapshotArray(name, className, boundToContent, snapshotUID, claimName st
}

// newClaim returns a new claim with given attributes
func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) *v1.PersistentVolumeClaim {
func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string, bFinalizer bool) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down Expand Up @@ -877,14 +916,25 @@ func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.Persisten
claim.Status.Capacity = claim.Spec.Resources.Requests
}

if bFinalizer {
return withPVCFinalizer(&claim)
}
return &claim
}

// newClaimArray returns array with a single claim that would be returned by
// newClaim() with the same parameters.
func newClaimArray(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) []*v1.PersistentVolumeClaim {
return []*v1.PersistentVolumeClaim{
newClaim(name, claimUID, capacity, boundToVolume, phase, class),
newClaim(name, claimUID, capacity, boundToVolume, phase, class, false),
}
}

// newClaimArrayFinalizer returns array with a single claim that would be returned by
// newClaim() with the same parameters plus finalizer.
func newClaimArrayFinalizer(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) []*v1.PersistentVolumeClaim {
return []*v1.PersistentVolumeClaim{
newClaim(name, claimUID, capacity, boundToVolume, phase, class, true),
}
}

Expand Down Expand Up @@ -961,6 +1011,14 @@ func testSyncContent(ctrl *csiSnapshotController, reactor *snapshotReactor, test
return ctrl.syncContent(test.initialContents[0])
}

func testAddPVCFinalizer(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.ensureSnapshotSourceFinalizer(test.initialSnapshots[0])
}

func testRemovePVCFinalizer(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.checkandRemoveSnapshotSourceFinalizer(test.initialSnapshots[0])
}

var (
classEmpty string
classGold = "gold"
Expand Down Expand Up @@ -1097,6 +1155,113 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
}
}

// This tests ensureSnapshotSourceFinalizer and checkandRemoveSnapshotSourceFinalizer
func runPVCFinalizerTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1.VolumeSnapshotClass) {
snapshotscheme.AddToScheme(scheme.Scheme)
for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name)

// Initialize the controller
kubeClient := &kubefake.Clientset{}
client := &fake.Clientset{}

ctrl, err := newTestController(kubeClient, client, nil, t, test)
if err != nil {
t.Fatalf("Test %q construct persistent content failed: %v", test.name, err)
}

reactor := newSnapshotReactor(kubeClient, client, ctrl, nil, nil, test.errors)
for _, snapshot := range test.initialSnapshots {
ctrl.snapshotStore.Add(snapshot)
reactor.snapshots[snapshot.Name] = snapshot
}
for _, content := range test.initialContents {
if ctrl.isDriverMatch(test.initialContents[0]) {
ctrl.contentStore.Add(content)
reactor.contents[content.Name] = content
}
}

pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim
pvcIndexer.Add(claim)
}
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)

for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume
}
for _, storageClass := range test.initialStorageClasses {
reactor.storageClasses[storageClass.Name] = storageClass
}
for _, secret := range test.initialSecrets {
reactor.secrets[secret.Name] = secret
}

// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, class := range snapshotClasses {
indexer.Add(class)
}
ctrl.classLister = storagelisters.NewVolumeSnapshotClassLister(indexer)

// Run the tested functions
err = test.test(ctrl, reactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}

// Verify PVCFinalizer tests results
evaluatePVCFinalizerTests(ctrl, reactor, test, t)
}
}

// Evaluate PVCFinalizer tests results
func evaluatePVCFinalizerTests(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest, t *testing.T) {
// Evaluate results
bHasPVCFinalizer := false
name := sysruntime.FuncForPC(reflect.ValueOf(test.test).Pointer()).Name()
index := strings.LastIndex(name, ".")
if index == -1 {
t.Errorf("Test %q: failed to test finalizer - invalid test call name [%s]", test.name, name)
return
}
names := []rune(name)
funcName := string(names[index+1 : len(name)])
klog.V(4).Infof("test %q: PVCFinalizer test func name: [%s]", test.name, funcName)

if funcName == "testAddPVCFinalizer" {
for _, pvc := range reactor.claims {
if test.initialClaims[0].Name == pvc.Name {
if !slice.ContainsString(test.initialClaims[0].ObjectMeta.Finalizers, PVCFinalizer, nil) && slice.ContainsString(pvc.ObjectMeta.Finalizers, PVCFinalizer, nil) {
klog.V(4).Infof("test %q succeeded. PVCFinalizer is added to PVC %s", test.name, pvc.Name)
bHasPVCFinalizer = true
}
break
}
}
if test.expectSuccess && !bHasPVCFinalizer {
t.Errorf("Test %q: failed to add finalizer to PVC %s", test.name, test.initialClaims[0].Name)
}
}
bHasPVCFinalizer = true
if funcName == "testRemovePVCFinalizer" {
for _, pvc := range reactor.claims {
if test.initialClaims[0].Name == pvc.Name {
if slice.ContainsString(test.initialClaims[0].ObjectMeta.Finalizers, PVCFinalizer, nil) && !slice.ContainsString(pvc.ObjectMeta.Finalizers, PVCFinalizer, nil) {
klog.V(4).Infof("test %q succeeded. PVCFinalizer is removed from PVC %s", test.name, pvc.Name)
bHasPVCFinalizer = false
}
break
}
}
if test.expectSuccess && bHasPVCFinalizer {
t.Errorf("Test %q: failed to remove finalizer from PVC %s", test.name, test.initialClaims[0].Name)
}
}
}

func getSize(size int64) *resource.Quantity {
return resource.NewQuantity(size, resource.BinarySI)
}
Expand Down
Loading

0 comments on commit a36f9c8

Please sign in to comment.