diff --git a/controller/controller.go b/controller/controller.go index dc04731..505379a 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -136,6 +136,7 @@ type ProvisionController struct { createProvisionedPVBackoff *wait.Backoff createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration + createProvisionerPVLimiter workqueue.RateLimiter failedProvisionThreshold, failedDeleteThreshold int @@ -158,6 +159,8 @@ type ProvisionController struct { // Map UID -> *PVC with all claims that may be provisioned in the background. claimsInProgress sync.Map + + volumeStore VolumeStore } const ( @@ -245,6 +248,7 @@ func ExponentialBackOffOnError(exponentialBackOffOnError bool) func(*ProvisionCo // CreateProvisionedPVRetryCount is the number of retries when we create a PV // object for a provisioned volume. Defaults to 5. +// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted! func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*ProvisionController) error { return func(c *ProvisionController) error { if c.HasRun() { @@ -253,6 +257,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov if c.createProvisionedPVBackoff != nil { return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount") } + if c.createProvisionerPVLimiter != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVLimiter") + } c.createProvisionedPVRetryCount = createProvisionedPVRetryCount return nil } @@ -260,6 +267,7 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov // CreateProvisionedPVInterval is the interval between retries when we create a // PV object for a provisioned volume. Defaults to 10 seconds. +// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted! func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func(*ProvisionController) error { return func(c *ProvisionController) error { if c.HasRun() { @@ -268,6 +276,9 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func if c.createProvisionedPVBackoff != nil { return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") } + if c.createProvisionerPVLimiter != nil { + return fmt.Errorf("CreateProvisionedPVInterval cannot be used together with CreateProvisionedPVLimiter") + } c.createProvisionedPVInterval = createProvisionedPVInterval return nil } @@ -275,6 +286,7 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func // CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a // PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times. +// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted! // Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff // can be used. func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error { @@ -288,11 +300,40 @@ func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) if c.createProvisionedPVInterval != 0 { return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") } + if c.createProvisionerPVLimiter != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVLimiter") + } c.createProvisionedPVBackoff = &backoff return nil } } +// CreateProvisionedPVLimiter is the configuration of rate limiter for queue of unsaved PersistentVolumes. +// If set, PVs that fail to be saved to Kubernetes API server will be re-enqueued to a separate workqueue +// with this limiter and re-tried until they are saved to API server. There is no limit of retries. +// The main difference to other CreateProvisionedPV* option is that the storage asset is never deleted +// and the controller continues saving PV to API server indefinitely. +// This option cannot be used with CreateProvisionedPVBackoff or CreateProvisionedPVInterval +// or CreateProvisionedPVRetryCount. +func CreateProvisionedPVLimiter(limiter workqueue.RateLimiter) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + if c.createProvisionedPVRetryCount != 0 { + return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVRetryCount") + } + if c.createProvisionedPVInterval != 0 { + return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVInterval") + } + if c.createProvisionedPVBackoff != nil { + return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVBackoff") + } + c.createProvisionerPVLimiter = limiter + return nil + } +} + // FailedProvisionThreshold is the threshold for max number of retries on // failures of Provision. Set to 0 to retry indefinitely. Defaults to 15. func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error { @@ -533,20 +574,27 @@ func NewProvisionController( controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") - if controller.createProvisionedPVBackoff == nil { - // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. - if controller.createProvisionedPVInterval == 0 { - controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval - } - if controller.createProvisionedPVRetryCount == 0 { - controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount - } - controller.createProvisionedPVBackoff = &wait.Backoff{ - Duration: controller.createProvisionedPVInterval, - Factor: 1, // linear backoff - Steps: controller.createProvisionedPVRetryCount, - Cap: controller.createProvisionedPVInterval, + if controller.createProvisionerPVLimiter != nil { + glog.V(2).Infof("Using saving PVs to API server in background") + controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter) + } else { + if controller.createProvisionedPVBackoff == nil { + // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. + if controller.createProvisionedPVInterval == 0 { + controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval + } + if controller.createProvisionedPVRetryCount == 0 { + controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount + } + controller.createProvisionedPVBackoff = &wait.Backoff{ + Duration: controller.createProvisionedPVInterval, + Factor: 1, // linear backoff + Steps: controller.createProvisionedPVRetryCount, + //Cap: controller.createProvisionedPVInterval, + } } + glog.V(2).Infof("Using blocking saving PVs to API server") + controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller) } informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) @@ -607,7 +655,6 @@ func NewProvisionController( } } controller.classes = controller.classInformer.GetStore() - return controller } @@ -730,6 +777,8 @@ func (ctrl *ProvisionController) Run(_ <-chan struct{}) { select {} } + go ctrl.volumeStore.Run(context.TODO(), DefaultThreadiness) + if ctrl.leaderElection { rl, err := resourcelock.New("endpoints", ctrl.leaderElectionNamespace, @@ -925,7 +974,10 @@ func (ctrl *ProvisionController) syncClaim(obj interface{}) (ProvisioningState, if ctrl.shouldProvision(claim) { startTime := time.Now() - status, err := ctrl.provisionClaimOperation(claim) + + var status ProvisioningState + var err error + status, err = ctrl.provisionClaimOperation(claim) ctrl.updateProvisionStats(claim, err, startTime) return status, err } @@ -1202,59 +1254,11 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annClass, claimClass) } - // Try to create the PV object several times - var lastSaveError error - err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { - glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name)) - if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { - // Save succeeded. - if err != nil { - glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) - } else { - glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name)) - } - return true, nil - } - // Save failed, try again after a while. - glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) - lastSaveError = err - return false, nil - }) + glog.Info(logOperation(operation, "succeeded")) - if err != nil { - // Save failed. Now we have a storage asset outside of Kubernetes, - // but we don't have appropriate PV object for it. - // Emit some event here and try to delete the storage asset several - // times. - strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError) - glog.Error(logOperation(operation, strerr)) - ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr) - - var lastDeleteError error - err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { - if err = ctrl.provisioner.Delete(volume); err == nil { - // Delete succeeded - glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) - return true, nil - } - // Delete failed, try again after a while. - glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) - lastDeleteError = err - return false, nil - }) - if err != nil { - // Delete failed several times. There is an orphaned volume and there - // is nothing we can do about it. - strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError) - glog.Error(logOperation(operation, strerr)) - ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) - } - } else { - msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name) - ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg) + if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil { + return ProvisioningFinished, err } - - glog.Info(logOperation(operation, "succeeded")) return ProvisioningFinished, nil } diff --git a/controller/controller_test.go b/controller/controller_test.go index c7af851..4de4414 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -309,6 +309,7 @@ func TestController(t *testing.T) { }, } for _, test := range tests { + klog.Infof("starting %s", test.name) client := fake.NewSimpleClientset(test.objs...) if len(test.verbs) != 0 { for _, v := range test.verbs { diff --git a/controller/volume_store.go b/controller/volume_store.go new file mode 100644 index 0000000..232326d --- /dev/null +++ b/controller/volume_store.go @@ -0,0 +1,242 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/client-go/tools/record" + + "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +// VolumeStore is an interface that's used to save PersistentVolumes to API server. +// Implementation of the interface add custom error recovery policy. +// A volume is added via StoreVolume(). It's enough to store the volume only once. +// It is not possible to remove a volume, even when corresponding PVC is deleted +// and PV is not necessary any longer. PV will be always created. +// If corresponding PVC is deleted, the PV will be deleted by Kubernetes using +// standard deletion procedure. It saves us some code here. +type VolumeStore interface { + // StoreVolume makes sure a volume is saved to Kubernetes API server. + // If no error is returned, caller can assume that PV was saved or + // is being saved in background. + // In error is returned, no PV was saved and corresponding PVC needs + // to be re-queued (so whole provisioning needs to be done again). + StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error + + // Runs any background goroutines for implementation of the interface. + Run(ctx context.Context, threadiness int) +} + +// queueStore is implementation of VolumeStore that re-tries saving +// PVs to API server using a workqueue running in its own goroutine(s). +// After failed save, volume is re-qeueued with exponential backoff. +type queueStore struct { + client kubernetes.Interface + queue workqueue.RateLimitingInterface + + volumes sync.Map +} + +var _ VolumeStore = &queueStore{} + +func NewVolumeStoreQueue( + client kubernetes.Interface, + limiter workqueue.RateLimiter, +) VolumeStore { + + return &queueStore{ + client: client, + queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"), + } +} + +func (q *queueStore) StoreVolume(_ *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error { + if err := q.doSaveVolume(volume); err != nil { + q.volumes.Store(volume.Name, volume) + q.queue.Add(volume.Name) + klog.Errorf("Failed to save volume %s: %s", volume.Name, err) + } + // Consume any error, this Store will retry in background. + return nil +} + +func (q *queueStore) Run(ctx context.Context, threadiness int) { + klog.Infof("Starting save volume queue") + defer q.queue.ShutDown() + + for i := 0; i < threadiness; i++ { + go wait.Until(q.saveVolumeWorker, time.Second, ctx.Done()) + } + <-ctx.Done() + klog.Infof("Stopped save volume queue") +} + +func (q *queueStore) saveVolumeWorker() { + for q.processNextWorkItem() { + } +} + +func (q *queueStore) processNextWorkItem() bool { + obj, shutdown := q.queue.Get() + defer q.queue.Done(obj) + + if shutdown { + return false + } + + var volumeName string + var ok bool + if volumeName, ok = obj.(string); !ok { + q.queue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in save workqueue but got %#v", obj)) + return true + } + + volumeObj, found := q.volumes.Load(volumeName) + if !found { + q.queue.Forget(volumeName) + utilruntime.HandleError(fmt.Errorf("did not find saved volume %s", volumeName)) + return true + } + + volume, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + q.queue.Forget(volumeName) + utilruntime.HandleError(fmt.Errorf("saved object is not volume: %+v", volumeObj)) + return true + } + + if err := q.doSaveVolume(volume); err != nil { + q.queue.AddRateLimited(volumeName) + utilruntime.HandleError(err) + klog.V(5).Infof("Volume %s enqueued", volume.Name) + return true + } + q.volumes.Delete(volumeName) + q.queue.Forget(volumeName) + return true +} + +func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error { + klog.V(5).Infof("Saving volume %s", volume.Name) + _, err := q.client.CoreV1().PersistentVolumes().Create(volume) + if err == nil || apierrs.IsAlreadyExists(err) { + klog.V(5).Infof("Volume %s saved", volume.Name) + return nil + } + return fmt.Errorf("error saving volume %s: %s", volume.Name, err) +} + +// backoffStore is implementation of VolumeStore that blocks and tries to save +// a volume to API server with configurable backoff. If saving fails, +// StoreVolume() deletes the storage asset in the end and returns appropriate +// error code. +type backoffStore struct { + client kubernetes.Interface + eventRecorder record.EventRecorder + backoff *wait.Backoff + ctrl *ProvisionController +} + +var _ VolumeStore = &backoffStore{} + +func NewBackoffStore(client kubernetes.Interface, + eventRecorder record.EventRecorder, + backoff *wait.Backoff, + ctrl *ProvisionController, +) VolumeStore { + return &backoffStore{ + client: client, + eventRecorder: eventRecorder, + backoff: backoff, + ctrl: ctrl, + } +} + +func (b *backoffStore) StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error { + // Try to create the PV object several times + var lastSaveError error + err := wait.ExponentialBackoff(*b.backoff, func() (bool, error) { + klog.Infof("Trying to save persistentvolume %q", volume.Name) + var err error + if _, err = b.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { + // Save succeeded. + if err != nil { + klog.Infof("persistentvolume %q already exists, reusing", volume.Name) + } else { + klog.Infof("persistentvolume %q saved", volume.Name) + } + return true, nil + } + // Save failed, try again after a while. + klog.Infof("Failed to save persistentvolume %q: %v", volume.Name, err) + lastSaveError = err + return false, nil + }) + + if err == nil { + // Save succeeded + msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name) + b.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg) + return nil + } + + // Save failed. Now we have a storage asset outside of Kubernetes, + // but we don't have appropriate PV object for it. + // Emit some event here and try to delete the storage asset several + // times. + strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError) + klog.Error(strerr) + b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr) + + var lastDeleteError error + err = wait.ExponentialBackoff(*b.backoff, func() (bool, error) { + if err = b.ctrl.provisioner.Delete(volume); err == nil { + // Delete succeeded + klog.Infof("Cleaning volume %q succeeded", volume.Name) + return true, nil + } + // Delete failed, try again after a while. + klog.Infof("Failed to clean volume %q: %v", volume.Name, err) + lastDeleteError = err + return false, nil + }) + if err != nil { + // Delete failed several times. There is an orphaned volume and there + // is nothing we can do about it. + strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError) + klog.Error(strerr) + b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) + } + + return lastSaveError +} + +func (b *backoffStore) Run(ctx context.Context, threadiness int) { + // There is not background processing +}