Skip to content

Commit

Permalink
avoid concurrent processing of same PVC
Browse files Browse the repository at this point in the history
  • Loading branch information
mlmhl committed May 12, 2019
1 parent c398721 commit 4832ee0
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 18 deletions.
84 changes: 69 additions & 15 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"sync"
"time"

"github.com/kubernetes-csi/external-resizer/pkg/resizer"
Expand Down Expand Up @@ -57,6 +58,7 @@ type resizeController struct {
pvSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced cache.InformerSynced
lastProcessedPVCVersions *pvcVersionSet
}

// NewResizeController returns a ResizeController.
Expand Down Expand Up @@ -88,6 +90,7 @@ func NewResizeController(
pvcSynced: pvcInformer.Informer().HasSynced,
claimQueue: claimQueue,
eventRecorder: eventRecorder,
lastProcessedPVCVersions: newPVCVersionSet(),
}

// Add a resync period as the PVC's request size can be resized again when we handling
Expand Down Expand Up @@ -123,12 +126,21 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) {
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]

// We perform additional checks to avoid double processing of PVCs, as we will also receive Update event when:
// 1. Administrator or users may introduce other changes(such as add labels, modify annotations, etc.)
// unrelated to volume resize.
// 2. Informer will resync and send Update event periodically without any changes.
if newSize.Cmp(oldSize) > 0 {
ctrl.addPVC(newObj)
} else {
// PVC's size not changed, so this Update event maybe caused by:
//
// 1. Administrators or users introduce other changes(such as add labels, modify annotations, etc.)
// unrelated to volume resize.
// 2. Informer resynced the PVC and send this Update event without any changes.
//
// If it is case 1, we can just discard this event. If case 2, we need to put it into the queue to
// perform a resync operation.
if newPVC.ResourceVersion == oldPVC.ResourceVersion {
// This is case 2.
ctrl.addPVC(newObj)
}
}
}

Expand All @@ -138,6 +150,8 @@ func (ctrl *resizeController) deletePVC(obj interface{}) {
return
}
ctrl.claimQueue.Forget(objKey)
// Delete according version cache to release memory.
ctrl.lastProcessedPVCVersions.Delete(objKey)
}

func getPVCKey(obj interface{}) (string, error) {
Expand Down Expand Up @@ -210,6 +224,16 @@ func (ctrl *resizeController) syncPVC(key string) error {
return err
}

// If pvc's ResourceVersion is equal to last processed ResourceVersion, it means this is a resync event,
// so we need to check PVC's version when patch its status to avoid double process on same PVC.
// More details see: https://github.com/kubernetes-csi/external-resizer/issues/4.
needVersionCheck := ctrl.lastProcessedPVCVersions.Get(key) == pvc.ResourceVersion
if needVersionCheck {
klog.V(5).Infof("Apply version check for PVC %s/%s due to resync event", key, pvc.ResourceVersion)
} else {
ctrl.lastProcessedPVCVersions.Set(key, pvc.ResourceVersion)
}

if !ctrl.pvcNeedResize(pvc) {
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
return nil
Expand All @@ -230,7 +254,7 @@ func (ctrl *resizeController) syncPVC(key string) error {
return nil
}

return ctrl.resizePVC(pvc, pv)
return ctrl.resizePVC(pvc, pv, needVersionCheck)
}

// pvcNeedResize returns true is a pvc requests a resize operation.
Expand Down Expand Up @@ -280,8 +304,8 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
// 1. Mark pvc as resizing.
// 2. Resize the volume and the pv object.
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, needVersionCheck bool) error {
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc, needVersionCheck); err != nil {
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
return err
} else if updatedPVC != nil {
Expand All @@ -300,10 +324,10 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe

if fsResizeRequired {
// Resize volume succeeded and need to resize file system by kubelet, mark it as file system resizing required.
return ctrl.markPVCAsFSResizeRequired(pvc)
return ctrl.markPVCAsFSResizeRequired(pvc, needVersionCheck)
}
// Resize volume succeeded and no need to resize file system by kubelet, mark it as resizing finished.
return ctrl.markPVCResizeFinished(pvc, newSize)
return ctrl.markPVCResizeFinished(pvc, newSize, needVersionCheck)
}()

if err != nil {
Expand Down Expand Up @@ -337,7 +361,7 @@ func (ctrl *resizeController) resizeVolume(
return newSize, fsResizeRequired, nil
}

func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim, needVersionCheck bool) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimResizing,
Expand All @@ -347,14 +371,17 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{progressCondition})
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck)
}

func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClaim, newSize resource.Quantity) error {
func (ctrl *resizeController) markPVCResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity,
needVersionCheck bool) error {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck)
if err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err
Expand All @@ -366,7 +393,7 @@ func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClai
return nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim, needVersionCheck bool) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
Expand All @@ -376,7 +403,7 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient, needVersionCheck)
if err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
Expand All @@ -388,3 +415,30 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume

return err
}

func newPVCVersionSet() *pvcVersionSet {
return &pvcVersionSet{versions: make(map[string]string)}
}

type pvcVersionSet struct {
lock sync.Mutex
versions map[string]string
}

func (s *pvcVersionSet) Set(key, version string) {
s.lock.Lock()
defer s.lock.Unlock()
s.versions[key] = version
}

func (s *pvcVersionSet) Get(key string) string {
s.lock.Lock()
defer s.lock.Unlock()
return s.versions[key]
}

func (s *pvcVersionSet) Delete(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.versions, key)
}
41 changes: 38 additions & 3 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"regexp"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -95,8 +97,9 @@ func MergeResizeConditionsOfPVC(oldConditions, newConditions []v1.PersistentVolu
func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
kubeClient kubernetes.Interface,
needCheckVersion bool) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := getPVCPatchData(oldPVC, newPVC, needCheckVersion)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err)
}
Expand All @@ -108,6 +111,38 @@ func PatchPVCStatus(
return updatedClaim, nil
}

func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim, needCheckVersion bool) ([]byte, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
if err != nil || !needCheckVersion {
return patchBytes, err
}

patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err)
}
return patchBytes, nil
}

func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patchBytes, &patchMap)
if err != nil {
return nil, fmt.Errorf("error unmarshalling patch with %v", err)
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, fmt.Errorf("error creating accessor with %v", err)
}
a.SetResourceVersion(resourceVersion)
versionBytes, err := json.Marshal(patchMap)
if err != nil {
return nil, fmt.Errorf("error marshalling json patch with %v", err)
}
return versionBytes, nil
}

// UpdatePVCapacity updates PVC capacity with requested size.
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
newPV := pv.DeepCopy()
Expand All @@ -130,7 +165,7 @@ func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
}
newData, err := json.Marshal(newObj)
if err != nil {
return nil, fmt.Errorf("mashal new object failed: %v", err)
return nil, fmt.Errorf("marshal new object failed: %v", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package util

import (
"encoding/json"
"testing"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetPVCPatchData(t *testing.T) {
for i, c := range []struct {
NeedVersionCheck bool
OldPVC *v1.PersistentVolumeClaim
}{
{false, &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{true, &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
} {
newPVC := c.OldPVC.DeepCopy()
newPVC.Status.Conditions = append(newPVC.Status.Conditions,
v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue})
patchBytes, err := getPVCPatchData(c.OldPVC, newPVC, c.NeedVersionCheck)
if err != nil {
t.Errorf("Case %d: Get patch data failed: %v", i, err)
}

var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("Case %d: unmarshalling json patch failed: %v", i, err)
}

metadata, exist := patchMap["metadata"].(map[string]interface{})
if c.NeedVersionCheck {
if !exist {
t.Errorf("Case %d: ResourceVersion should exist in patch data", i)
}
resourceVersion := metadata["resourceVersion"].(string)
if resourceVersion != c.OldPVC.ResourceVersion {
t.Errorf("Case %d: ResourceVersion should be %s, got %s",
i, c.OldPVC.ResourceVersion, resourceVersion)
}
} else {
if exist {
t.Errorf("Case %d: metadata shouldn't exist in patch data", i)
}
}
}
}

0 comments on commit 4832ee0

Please sign in to comment.