Skip to content

Commit

Permalink
Add separate workqueue for unsaved PVs
Browse files Browse the repository at this point in the history
  • Loading branch information
jsafrane committed Feb 27, 2019
1 parent 1d7bbe7 commit dc9d3c9
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 66 deletions.
136 changes: 70 additions & 66 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type ProvisionController struct {
createProvisionedPVBackoff *wait.Backoff
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration
createProvisionerPVLimiter workqueue.RateLimiter

failedProvisionThreshold, failedDeleteThreshold int

Expand All @@ -158,6 +159,8 @@ type ProvisionController struct {

// Map UID -> *PVC with all claims that may be provisioned in the background.
claimsInProgress sync.Map

volumeStore VolumeStore
}

const (
Expand Down Expand Up @@ -245,6 +248,7 @@ func ExponentialBackOffOnError(exponentialBackOffOnError bool) func(*ProvisionCo

// CreateProvisionedPVRetryCount is the number of retries when we create a PV
// object for a provisioned volume. Defaults to 5.
// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted!
func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
Expand All @@ -253,13 +257,17 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionerPVLimiter != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVLimiter")
}
c.createProvisionedPVRetryCount = createProvisionedPVRetryCount
return nil
}
}

// CreateProvisionedPVInterval is the interval between retries when we create a
// PV object for a provisioned volume. Defaults to 10 seconds.
// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted!
func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
Expand All @@ -268,13 +276,17 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
if c.createProvisionerPVLimiter != nil {
return fmt.Errorf("CreateProvisionedPVInterval cannot be used together with CreateProvisionedPVLimiter")
}
c.createProvisionedPVInterval = createProvisionedPVInterval
return nil
}
}

// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a
// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times.
// If PV is not saved after given number of retries, corresponding storage asset (volume) is deleted!
// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff
// can be used.
func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error {
Expand All @@ -288,11 +300,40 @@ func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController)
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
if c.createProvisionerPVLimiter != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVLimiter")
}
c.createProvisionedPVBackoff = &backoff
return nil
}
}

// CreateProvisionedPVLimiter is the configuration of rate limiter for queue of unsaved PersistentVolumes.
// If set, PVs that fail to be saved to Kubernetes API server will be re-enqueued to a separate workqueue
// with this limiter and re-tried until they are saved to API server. There is no limit of retries.
// The main difference to other CreateProvisionedPV* option is that the storage asset is never deleted
// and the controller continues saving PV to API server indefinitely.
// This option cannot be used with CreateProvisionedPVBackoff or CreateProvisionedPVInterval
// or CreateProvisionedPVRetryCount.
func CreateProvisionedPVLimiter(limiter workqueue.RateLimiter) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVRetryCount != 0 {
return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVInterval")
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVLimiter cannot be used together with CreateProvisionedPVBackoff")
}
c.createProvisionerPVLimiter = limiter
return nil
}
}

// FailedProvisionThreshold is the threshold for max number of retries on
// failures of Provision. Set to 0 to retry indefinitely. Defaults to 15.
func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error {
Expand Down Expand Up @@ -533,20 +574,27 @@ func NewProvisionController(
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
Cap: controller.createProvisionedPVInterval,
if controller.createProvisionerPVLimiter != nil {
glog.V(2).Infof("Using saving PVs to API server in background")
controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter)
} else {
if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
//Cap: controller.createProvisionedPVInterval,
}
}
glog.V(2).Infof("Using blocking saving PVs to API server")
controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)
Expand Down Expand Up @@ -607,7 +655,6 @@ func NewProvisionController(
}
}
controller.classes = controller.classInformer.GetStore()

return controller
}

Expand Down Expand Up @@ -730,6 +777,8 @@ func (ctrl *ProvisionController) Run(_ <-chan struct{}) {
select {}
}

go ctrl.volumeStore.Run(context.TODO(), DefaultThreadiness)

if ctrl.leaderElection {
rl, err := resourcelock.New("endpoints",
ctrl.leaderElectionNamespace,
Expand Down Expand Up @@ -925,7 +974,10 @@ func (ctrl *ProvisionController) syncClaim(obj interface{}) (ProvisioningState,

if ctrl.shouldProvision(claim) {
startTime := time.Now()
status, err := ctrl.provisionClaimOperation(claim)

var status ProvisioningState
var err error
status, err = ctrl.provisionClaimOperation(claim)
ctrl.updateProvisionStats(claim, err, startTime)
return status, err
}
Expand Down Expand Up @@ -1202,59 +1254,11 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annClass, claimClass)
}

// Try to create the PV object several times
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
lastSaveError = err
return false, nil
})
glog.Info(logOperation(operation, "succeeded"))

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
} else {
msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg)
if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil {
return ProvisioningFinished, err
}

glog.Info(logOperation(operation, "succeeded"))
return ProvisioningFinished, nil
}

Expand Down
1 change: 1 addition & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func TestController(t *testing.T) {
},
}
for _, test := range tests {
klog.Infof("starting %q", test.name)
client := fake.NewSimpleClientset(test.objs...)
if len(test.verbs) != 0 {
for _, v := range test.verbs {
Expand Down
Loading

0 comments on commit dc9d3c9

Please sign in to comment.