Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Save PV with exponential backoff #32

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 56 additions & 49 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ type ProvisionController struct {

// Map UID -> *PVC with all claims that may be provisioned in the background.
claimsInProgress sync.Map

// Map PVC UID -> *PV that's waiting to be saved
unsavedVolumes sync.Map
}

const (
Expand Down Expand Up @@ -925,7 +928,14 @@ func (ctrl *ProvisionController) syncClaim(obj interface{}) (ProvisioningState,

if ctrl.shouldProvision(claim) {
startTime := time.Now()
status, err := ctrl.provisionClaimOperation(claim)

var status ProvisioningState
var err error
if pv := ctrl.hasUnsavedVolume(claim); pv != nil {
status, err = ctrl.saveVolume(claim, pv)
} else {
status, err = ctrl.provisionClaimOperation(claim)
}
ctrl.updateProvisionStats(claim, err, startTime)
return status, err
}
Expand Down Expand Up @@ -1202,60 +1212,36 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annClass, claimClass)
}

glog.Info(logOperation(operation, "succeeded"))

// Reset any exponential backoff for the volume and start from zero
ctrl.claimQueue.Forget(claim)
return ctrl.saveVolume(claim, volume)
}

func (ctrl *ProvisionController) saveVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) (ProvisioningState, error) {
// Try to create the PV object several times
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
lastSaveError = err
return false, nil
})
operation := fmt.Sprintf("saving PV %s for claim %s", volume.Name, claimToClaimKey(claim))
glog.Info(logOperation(operation, "starting"))

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
lastDeleteError = err
return false, nil
})
_, err := ctrl.client.CoreV1().PersistentVolumes().Create(volume)
if err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
ctrl.forgetUnsavedVolume(claim)
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
glog.Info(logOperation(operation, "volume already exists, reusing"))
} else {
glog.Info(logOperation(operation, "volume saved"))
}
} else {
msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg)
return ProvisioningFinished, nil
}

glog.Info(logOperation(operation, "succeeded"))
return ProvisioningFinished, nil
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed: %v", err))
ctrl.storeUnsavedVolume(claim, volume)

// Return ProvisioningInBackground, we don't want the provisioner to forget the claim if user deletes it.
return ProvisioningInBackground, err
}

// deleteVolumeOperation attempts to delete the volume backing the given
Expand Down Expand Up @@ -1458,3 +1444,24 @@ func (ctrl *ProvisionController) supportsBlock() bool {
}
return false
}

func (ctrl *ProvisionController) storeUnsavedVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) {
ctrl.unsavedVolumes.Store(string(claim.UID), volume)
}

func (ctrl *ProvisionController) forgetUnsavedVolume(claim *v1.PersistentVolumeClaim) {
ctrl.unsavedVolumes.Delete(string(claim.UID))
}

func (ctrl *ProvisionController) hasUnsavedVolume(claim *v1.PersistentVolumeClaim) *v1.PersistentVolume {
obj, ok := ctrl.unsavedVolumes.Load(string(claim.UID))
if !ok {
return nil
}

pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil
}
return pv
}
54 changes: 50 additions & 4 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ import (
testclient "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

const (
resyncPeriod = 100 * time.Millisecond
resyncPeriod = 1000 * time.Millisecond
sharedResyncPeriod = 1 * time.Second
defaultServerVersion = "v1.5.0"
)
Expand All @@ -55,6 +56,9 @@ func init() {

// TODO clean this up, e.g. remove redundant params (provisionerName: "foo.bar/baz")
func TestController(t *testing.T) {
// Counter of reactor calls, if a test needs it. Reset to zero before a test starts.
var callCounter int

tests := []struct {
name string
objs []runtime.Object
Expand All @@ -66,6 +70,7 @@ func TestController(t *testing.T) {
reaction testclient.ReactionFunc
expectedVolumes []v1.PersistentVolume
expectedClaimsInProgress []string
expectedUnsavedVolumes []string
serverVersion string
}{
{
Expand Down Expand Up @@ -167,7 +172,32 @@ func TestController(t *testing.T) {
reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake error")
},
expectedVolumes: []v1.PersistentVolume(nil),
expectedVolumes: []v1.PersistentVolume(nil),
expectedClaimsInProgress: []string{"uid-1-1"},
expectedUnsavedVolumes: []string{"uid-1-1"},
},
{
name: "try to provision for claim-1, but fail to save the pv object once and succeed second time",
objs: []runtime.Object{
newBetaStorageClass("class-22", "foo.bar/baz"),
newClaim("claim-22", "uid-1-1", "class-22", "foo.bar/baz", "", nil),
},
provisionerName: "foo.bar/baz",
provisioner: newTestProvisioner(),
verbs: []string{"create"},
reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
callCounter++
if callCounter == 1 {
// The first call fails
return true, nil, errors.New("fake error")
}
// Any other call succeeds
return false, nil, nil
},
expectedVolumes: []v1.PersistentVolume{
*newProvisionedVolume(newBetaStorageClass("class-22", "foo.bar/baz"), newClaim("claim-22", "uid-1-1", "class-22", "foo.bar/baz", "", nil)),
},
expectedUnsavedVolumes: []string{},
},
{
name: "try to delete volume-1 but fail to delete the pv object",
Expand Down Expand Up @@ -309,6 +339,9 @@ func TestController(t *testing.T) {
},
}
for _, test := range tests {
klog.Infof("starting test %s", test.name)
callCounter = 0

client := fake.NewSimpleClientset(test.objs...)
if len(test.verbs) != 0 {
for _, v := range test.verbs {
Expand Down Expand Up @@ -351,9 +384,20 @@ func TestController(t *testing.T) {
})
expectedClaimsInProgress := sets.NewString(test.expectedClaimsInProgress...)
if !claimsInProgress.Equal(expectedClaimsInProgress) {
t.Errorf("expected claimsInProgres: %+v, got %+v", expectedClaimsInProgress.List(), claimsInProgress.List())
t.Errorf("test %q: expected claimsInProgres: %+v, got %+v", test.name, expectedClaimsInProgress.List(), claimsInProgress.List())
}

unsavedVolumes := sets.NewString()
ctrl.unsavedVolumes.Range(func(key, value interface{}) bool {
unsavedVolumes.Insert(key.(string))
return true
})
expectedUnsavedVolumes := sets.NewString(test.expectedUnsavedVolumes...)
if !unsavedVolumes.Equal(expectedUnsavedVolumes) {
t.Errorf("test %q: expected unsavedVolumes: %+v, got %+v", test.name, expectedUnsavedVolumes.List(), unsavedVolumes.List())
}
close(stopCh)
klog.Infof("finished test %s", test.name)
}
}

Expand Down Expand Up @@ -835,7 +879,9 @@ func newTestProvisionController(
CreateProvisionedPVInterval(10*time.Millisecond),
LeaseDuration(2*resyncPeriod),
RenewDeadline(resyncPeriod),
RetryPeriod(resyncPeriod/2))
RetryPeriod(resyncPeriod/2),
RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Millisecond, 1000*time.Millisecond)),
)
return ctrl
}

Expand Down