From 4832ee06d51ca65be89be4c40acc422f4e2ea187 Mon Sep 17 00:00:00 2001 From: mlmhl Date: Thu, 10 Jan 2019 19:18:37 +0800 Subject: [PATCH] avoid concurrent processing of same PVC --- pkg/controller/controller.go | 84 +++++++++++++++++++++++++++++------- pkg/util/util.go | 41 ++++++++++++++++-- pkg/util/util_test.go | 49 +++++++++++++++++++++ 3 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 pkg/util/util_test.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3ac3f71e7..b640a308c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "sync" "time" "github.com/kubernetes-csi/external-resizer/pkg/resizer" @@ -57,6 +58,7 @@ type resizeController struct { pvSynced cache.InformerSynced pvcLister corelisters.PersistentVolumeClaimLister pvcSynced cache.InformerSynced + lastProcessedPVCVersions *pvcVersionSet } // NewResizeController returns a ResizeController. @@ -88,6 +90,7 @@ func NewResizeController( pvcSynced: pvcInformer.Informer().HasSynced, claimQueue: claimQueue, eventRecorder: eventRecorder, + lastProcessedPVCVersions: newPVCVersionSet(), } // Add a resync period as the PVC's request size can be resized again when we handling @@ -123,12 +126,21 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) { newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage] oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] - // We perform additional checks to avoid double processing of PVCs, as we will also receive Update event when: - // 1. Administrator or users may introduce other changes(such as add labels, modify annotations, etc.) - // unrelated to volume resize. - // 2. Informer will resync and send Update event periodically without any changes. if newSize.Cmp(oldSize) > 0 { ctrl.addPVC(newObj) + } else { + // PVC's size not changed, so this Update event maybe caused by: + // + // 1. Administrators or users introduce other changes(such as add labels, modify annotations, etc.) + // unrelated to volume resize. + // 2. Informer resynced the PVC and send this Update event without any changes. + // + // If it is case 1, we can just discard this event. If case 2, we need to put it into the queue to + // perform a resync operation. + if newPVC.ResourceVersion == oldPVC.ResourceVersion { + // This is case 2. + ctrl.addPVC(newObj) + } } } @@ -138,6 +150,8 @@ func (ctrl *resizeController) deletePVC(obj interface{}) { return } ctrl.claimQueue.Forget(objKey) + // Delete according version cache to release memory. + ctrl.lastProcessedPVCVersions.Delete(objKey) } func getPVCKey(obj interface{}) (string, error) { @@ -210,6 +224,16 @@ func (ctrl *resizeController) syncPVC(key string) error { return err } + // If pvc's ResourceVersion is equal to last processed ResourceVersion, it means this is a resync event, + // so we need to check PVC's version when patch its status to avoid double process on same PVC. + // More details see: https://github.com/kubernetes-csi/external-resizer/issues/4. + needVersionCheck := ctrl.lastProcessedPVCVersions.Get(key) == pvc.ResourceVersion + if needVersionCheck { + klog.V(5).Infof("Apply version check for PVC %s/%s due to resync event", key, pvc.ResourceVersion) + } else { + ctrl.lastProcessedPVCVersions.Set(key, pvc.ResourceVersion) + } + if !ctrl.pvcNeedResize(pvc) { klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc)) return nil @@ -230,7 +254,7 @@ func (ctrl *resizeController) syncPVC(key string) error { return nil } - return ctrl.resizePVC(pvc, pv) + return ctrl.resizePVC(pvc, pv, needVersionCheck) } // pvcNeedResize returns true is a pvc requests a resize operation. @@ -280,8 +304,8 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1 // 1. Mark pvc as resizing. // 2. Resize the volume and the pv object. // 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed. -func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { - if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil { +func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, needVersionCheck bool) error { + if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc, needVersionCheck); err != nil { klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err) return err } else if updatedPVC != nil { @@ -300,10 +324,10 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe if fsResizeRequired { // Resize volume succeeded and need to resize file system by kubelet, mark it as file system resizing required. - return ctrl.markPVCAsFSResizeRequired(pvc) + return ctrl.markPVCAsFSResizeRequired(pvc, needVersionCheck) } // Resize volume succeeded and no need to resize file system by kubelet, mark it as resizing finished. - return ctrl.markPVCResizeFinished(pvc, newSize) + return ctrl.markPVCResizeFinished(pvc, newSize, needVersionCheck) }() if err != nil { @@ -337,7 +361,7 @@ func (ctrl *resizeController) resizeVolume( return newSize, fsResizeRequired, nil } -func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { +func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim, needVersionCheck bool) (*v1.PersistentVolumeClaim, error) { // Mark PVC as Resize Started progressCondition := v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimResizing, @@ -347,14 +371,17 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl newPVC := pvc.DeepCopy() newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{progressCondition}) - return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) + return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck) } -func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClaim, newSize resource.Quantity) error { +func (ctrl *resizeController) markPVCResizeFinished( + pvc *v1.PersistentVolumeClaim, + newSize resource.Quantity, + needVersionCheck bool) error { newPVC := pvc.DeepCopy() newPVC.Status.Capacity[v1.ResourceStorage] = newSize newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{}) - _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) + _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck) if err != nil { klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err) return err @@ -366,7 +393,7 @@ func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClai return nil } -func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error { +func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim, needVersionCheck bool) error { pvcCondition := v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimFileSystemResizePending, Status: v1.ConditionTrue, @@ -376,7 +403,7 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume newPVC := pvc.DeepCopy() newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{pvcCondition}) - _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) + _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck) if err != nil { klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err) return err @@ -388,3 +415,30 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume return err } + +func newPVCVersionSet() *pvcVersionSet { + return &pvcVersionSet{versions: make(map[string]string)} +} + +type pvcVersionSet struct { + lock sync.Mutex + versions map[string]string +} + +func (s *pvcVersionSet) Set(key, version string) { + s.lock.Lock() + defer s.lock.Unlock() + s.versions[key] = version +} + +func (s *pvcVersionSet) Get(key string) string { + s.lock.Lock() + defer s.lock.Unlock() + return s.versions[key] +} + +func (s *pvcVersionSet) Delete(key string) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.versions, key) +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 829be56fe..c1c5ff9da 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -22,7 +22,9 @@ import ( "regexp" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" @@ -95,8 +97,9 @@ func MergeResizeConditionsOfPVC(oldConditions, newConditions []v1.PersistentVolu func PatchPVCStatus( oldPVC *v1.PersistentVolumeClaim, newPVC *v1.PersistentVolumeClaim, - kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) { - patchBytes, err := getPatchData(oldPVC, newPVC) + kubeClient kubernetes.Interface, + needCheckVersion bool) (*v1.PersistentVolumeClaim, error) { + patchBytes, err := getPVCPatchData(oldPVC, newPVC, needCheckVersion) if err != nil { return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err) } @@ -108,6 +111,38 @@ func PatchPVCStatus( return updatedClaim, nil } +func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim, needCheckVersion bool) ([]byte, error) { + patchBytes, err := getPatchData(oldPVC, newPVC) + if err != nil || !needCheckVersion { + return patchBytes, err + } + + patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion) + if err != nil { + return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err) + } + return patchBytes, nil +} + +func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patchBytes, &patchMap) + if err != nil { + return nil, fmt.Errorf("error unmarshalling patch with %v", err) + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, fmt.Errorf("error creating accessor with %v", err) + } + a.SetResourceVersion(resourceVersion) + versionBytes, err := json.Marshal(patchMap) + if err != nil { + return nil, fmt.Errorf("error marshalling json patch with %v", err) + } + return versionBytes, nil +} + // UpdatePVCapacity updates PVC capacity with requested size. func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error { newPV := pv.DeepCopy() @@ -130,7 +165,7 @@ func getPatchData(oldObj, newObj interface{}) ([]byte, error) { } newData, err := json.Marshal(newObj) if err != nil { - return nil, fmt.Errorf("mashal new object failed: %v", err) + return nil, fmt.Errorf("marshal new object failed: %v", err) } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj) if err != nil { diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 000000000..2e386421a --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,49 @@ +package util + +import ( + "encoding/json" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetPVCPatchData(t *testing.T) { + for i, c := range []struct { + NeedVersionCheck bool + OldPVC *v1.PersistentVolumeClaim + }{ + {false, &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {true, &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, + } { + newPVC := c.OldPVC.DeepCopy() + newPVC.Status.Conditions = append(newPVC.Status.Conditions, + v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue}) + patchBytes, err := getPVCPatchData(c.OldPVC, newPVC, c.NeedVersionCheck) + if err != nil { + t.Errorf("Case %d: Get patch data failed: %v", i, err) + } + + var patchMap map[string]interface{} + err = json.Unmarshal(patchBytes, &patchMap) + if err != nil { + t.Errorf("Case %d: unmarshalling json patch failed: %v", i, err) + } + + metadata, exist := patchMap["metadata"].(map[string]interface{}) + if c.NeedVersionCheck { + if !exist { + t.Errorf("Case %d: ResourceVersion should exist in patch data", i) + } + resourceVersion := metadata["resourceVersion"].(string) + if resourceVersion != c.OldPVC.ResourceVersion { + t.Errorf("Case %d: ResourceVersion should be %s, got %s", + i, c.OldPVC.ResourceVersion, resourceVersion) + } + } else { + if exist { + t.Errorf("Case %d: metadata shouldn't exist in patch data", i) + } + } + } +}