Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid concurrent processing of same PVC #6

Merged
merged 1 commit into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/kubernetes-csi/external-resizer/pkg/resizer"
"github.com/kubernetes-csi/external-resizer/pkg/util"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -145,6 +145,19 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) {
// in-tree resizer name to CSI driver name.
if newSize.Cmp(oldSize) > 0 || newResizerName != oldResizerName {
ctrl.addPVC(newObj)
} else {
// PVC's size not changed, so this Update event maybe caused by:
//
// 1. Administrators or users introduce other changes(such as add labels, modify annotations, etc.)
// unrelated to volume resize.
// 2. Informer resynced the PVC and send this Update event without any changes.
//
// If it is case 1, we can just discard this event. If case 2, we need to put it into the queue to
// perform a resync operation.
if newPVC.ResourceVersion == oldPVC.ResourceVersion {
// This is case 2.
ctrl.addPVC(newObj)
}
}
}

Expand Down Expand Up @@ -366,12 +379,13 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
}

func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClaim, newSize resource.Quantity) error {
func (ctrl *resizeController) markPVCResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity) error {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
if err != nil {
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err
}
Expand All @@ -392,15 +406,14 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
if err != nil {

if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
}

klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")

return err
return nil
}
38 changes: 36 additions & 2 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"regexp"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -96,7 +98,7 @@ func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
patchBytes, err := getPVCPatchData(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err)
}
Expand All @@ -108,6 +110,38 @@ func PatchPVCStatus(
return updatedClaim, nil
}

func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
if err != nil {
return patchBytes, err
}

patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err)
}
return patchBytes, nil
}

func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patchBytes, &patchMap)
if err != nil {
return nil, fmt.Errorf("error unmarshalling patch with %v", err)
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, fmt.Errorf("error creating accessor with %v", err)
}
a.SetResourceVersion(resourceVersion)
versionBytes, err := json.Marshal(patchMap)
if err != nil {
return nil, fmt.Errorf("error marshalling json patch with %v", err)
}
return versionBytes, nil
}

// UpdatePVCapacity updates PVC capacity with requested size.
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
newPV := pv.DeepCopy()
Expand All @@ -130,7 +164,7 @@ func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
}
newData, err := json.Marshal(newObj)
if err != nil {
return nil, fmt.Errorf("mashal new object failed: %v", err)
return nil, fmt.Errorf("marshal new object failed: %v", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package util

import (
"encoding/json"
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetPVCPatchData(t *testing.T) {
for i, c := range []struct {
OldPVC *v1.PersistentVolumeClaim
}{
{&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
} {
newPVC := c.OldPVC.DeepCopy()
newPVC.Status.Conditions = append(newPVC.Status.Conditions,
v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue})
patchBytes, err := getPVCPatchData(c.OldPVC, newPVC)
if err != nil {
t.Errorf("Case %d: Get patch data failed: %v", i, err)
}

var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("Case %d: unmarshalling json patch failed: %v", i, err)
}

metadata, exist := patchMap["metadata"].(map[string]interface{})
if !exist {
t.Errorf("Case %d: ResourceVersion should exist in patch data", i)
}
resourceVersion := metadata["resourceVersion"].(string)
if resourceVersion != c.OldPVC.ResourceVersion {
t.Errorf("Case %d: ResourceVersion should be %s, got %s",
i, c.OldPVC.ResourceVersion, resourceVersion)
}
}
}