Skip to content

Commit

Permalink
Add UID index to claim queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jsafrane committed Feb 18, 2019
1 parent 7c9e764 commit 7a84686
Showing 1 changed file with 59 additions and 21 deletions.
80 changes: 59 additions & 21 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const annSelectedNode = "volume.kubernetes.io/selected-node"
// Finalizer for PVs so we know to clean them up
const finalizerPV = "external-provisioner.volume.kubernetes.io/finalizer"

const uidIndex = "uid"

// ProvisionController is a controller that provisions PersistentVolumes for
// PersistentVolumeClaims.
type ProvisionController struct {
Expand All @@ -104,8 +106,8 @@ type ProvisionController struct {
// * 1.6: storage classes enter GA
kubeVersion *utilversion.Version

claimInformer cache.SharedInformer
claims cache.Store
claimInformer cache.SharedIndexInformer
claimsIndexer cache.Indexer
volumeInformer cache.SharedInformer
volumes cache.Store
classInformer cache.SharedInformer
Expand Down Expand Up @@ -376,7 +378,7 @@ func RetryPeriod(retryPeriod time.Duration) func(*ProvisionController) error {

// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a internal informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
func ClaimsInformer(informer cache.SharedIndexInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
Expand Down Expand Up @@ -550,9 +552,9 @@ func NewProvisionController(
// PersistentVolumeClaims

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
}

if controller.claimInformer != nil {
Expand All @@ -561,15 +563,22 @@ func NewProvisionController(
controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
controller.claimInformer.AddEventHandler(claimHandler)
}
controller.claims = controller.claimInformer.GetStore()
controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {
uid, err := getObjectUID(obj)
if err != nil {
return nil, err
}
return []string{uid}, nil
}})
controller.claimsIndexer = controller.claimInformer.GetIndexer()

// -----------------
// PersistentVolumes

volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
}

if controller.volumeInformer != nil {
Expand All @@ -596,9 +605,37 @@ func NewProvisionController(
return controller
}

// enqueueWork takes an obj and converts it into a namespace/name string which
func getObjectUID(obj interface{}) (string, error) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return "", fmt.Errorf("error decoding object, invalid type")
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
return "", fmt.Errorf("error decoding object tombstone, invalid type")
}
}
return string(object.GetUID()), nil
}

// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue.
func (ctrl *ProvisionController) enqueueClaim(obj interface{}) {
uid, err := getObjectUID(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
if ctrl.claimQueue.NumRequeues(uid) == 0 {
ctrl.claimQueue.Add(uid)
}
}

// enqueueVolume takes an obj and converts it into a namespace/name string which
// is then put onto the given work queue.
func (ctrl *ProvisionController) enqueueWork(queue workqueue.RateLimitingInterface, obj interface{}) {
func (ctrl *ProvisionController) enqueueVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
Expand All @@ -607,22 +644,22 @@ func (ctrl *ProvisionController) enqueueWork(queue workqueue.RateLimitingInterfa
}
// Re-Adding is harmless but try to add it to the queue only if it is not
// already there, because if it is already there we *must* be retrying it
if queue.NumRequeues(key) == 0 {
queue.Add(key)
if ctrl.volumeQueue.NumRequeues(key) == 0 {
ctrl.volumeQueue.Add(key)
}
}

// forgetWork Forgets an obj from the given work queue, telling the queue to
// forgetVolume Forgets an obj from the given work queue, telling the queue to
// stop tracking its retries because e.g. the obj was deleted
func (ctrl *ProvisionController) forgetWork(queue workqueue.RateLimitingInterface, obj interface{}) {
func (ctrl *ProvisionController) forgetVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
queue.Forget(key)
queue.Done(key)
ctrl.volumeQueue.Forget(key)
ctrl.volumeQueue.Done(key)
}

// Run starts all of this controller's control loops
Expand Down Expand Up @@ -819,15 +856,16 @@ func (ctrl *ProvisionController) processNextVolumeWorkItem() bool {

// syncClaimHandler gets the claim from informer's cache then calls syncClaim
func (ctrl *ProvisionController) syncClaimHandler(key string) error {
claimObj, exists, err := ctrl.claims.GetByKey(key)
objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key)
if err != nil {
return err
}
if !exists {
if len(objs) == 0 {
// TODO: handle deleted PVCs
utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))
return nil
}

claimObj := objs[0]
return ctrl.syncClaim(claimObj)
}

Expand Down

0 comments on commit 7a84686

Please sign in to comment.