Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Commit

Permalink
Only do work for instances from a single queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorie committed Aug 1, 2017
1 parent 2bd85d6 commit 861fb7f
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 49 deletions.
5 changes: 1 addition & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(worker(c.serviceClassQueue, "ServiceClass", maxRetries, c.reconcileServiceClassKey), time.Second, stopCh)
go wait.Until(worker(c.instanceQueue, "Instance", maxRetries, c.reconcileInstanceKey), time.Second, stopCh)
go wait.Until(worker(c.bindingQueue, "Binding", maxRetries, c.reconcileBindingKey), time.Second, stopCh)
go wait.Until(worker(c.pollingQueue, "Poller", maxRetries, c.reconcileInstanceKey), time.Second, stopCh)
go wait.Until(worker(c.pollingQueue, "Poller", maxRetries, c.requeueInstanceForPoll), time.Second, stopCh)
}

<-stopCh
Expand All @@ -172,9 +172,6 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// If reconciler returns an error, requeue the item up to maxRetries before giving up.
// It enforces that the reconciler is never invoked concurrently with the same key.
// TODO: Consider allowing the reconciler to return an error that either specifies whether
// this is recoverable or not, rather than always continuing on an error condition. Seems
// like it should be possible to return an error, yet stop any further polling work.
func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, reconciler func(key string) error) func() {
return func() {
exit := false
Expand Down
66 changes: 60 additions & 6 deletions pkg/controller/controller_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,54 @@ func (c *controller) instanceAdd(obj interface{}) {
c.instanceQueue.Add(key)
}

// Async operations on instances have a somewhat convoluted flow in order to
// ensure that only a single goroutine works on an instance at any given time.
// The flow is:
//
// 1. When the controller wants to begin polling the state of an operation on
// an instance, it calls its beginPollingInstance method (or
// calls continuePollingInstance, an alias of that method)
// 2. begin/continuePollingInstance do a rate-limited add to the polling queue
// 3. the pollingQueue calls requeueInstanceForPoll, which adds the instance's
// key to the instance work queue
// 4. the worker servicing the instance polling queue forgets the instances key,
// requiring the controller to call continuePollingInstance if additional
// work is needed.
// 5. the instance work queue is the single work queue that actually services
// instances by calling reconcileInstance

// requeueInstanceForPoll adds the given instance key to the controller's work
// queue for instances. It is used to trigger polling for the status of an
// async operation on and instance and is called by the worker servicing the
// instance polling queue. After requeueInstanceForPoll exits, the worker
// forgets the key from the polling queue, so the controller must call
// continuePollingInstance if the instance requires additional polling.
func (c *controller) requeueInstanceForPoll(key string) error {
c.instanceQueue.Add(key)

return nil
}

// beginPollingInstance does a rate-limited add of the key for the given
// instance to the controller's instance polling queue.
func (c *controller) beginPollingInstance(instance *v1alpha1.Instance) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
glog.Errorf("Couldn't create a key for object %+v: %v", instance, err)
return fmt.Errorf("Couldn't create a key for object %+v: %v", instance, err)
}

c.pollingQueue.AddRateLimited(key)

return nil
}

// continuePollingInstance does a rate-limited add of the key for the given
// instance to the controller's instance polling queue.
func (c *controller) continuePollingInstance(instance *v1alpha1.Instance) error {
return c.beginPollingInstance(instance)
}

func (c *controller) reconcileInstanceKey(key string) error {
// For namespace-scoped resources, SplitMetaNamespaceKey splits the key
// i.e. "namespace/name" into two separate strings
Expand Down Expand Up @@ -205,6 +253,11 @@ func (c *controller) reconcileInstanceDelete(instance *v1alpha1.Instance) error
if err != nil {
return err
}

err = c.beginPollingInstance(instance)
if err != nil {
return err
}
} else {
glog.V(5).Infof("Deprovision call to broker succeeded for Instance %v/%v, finalizing", instance.Namespace, instance.Name)
}
Expand Down Expand Up @@ -461,13 +514,9 @@ func (c *controller) reconcileInstance(instance *v1alpha1.Instance) error {

c.recorder.Eventf(instance, api.EventTypeNormal, asyncProvisioningReason, asyncProvisioningMessage)

// Actually, start polling this Service Instance by adding it into the polling queue
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(instance)
if err != nil {
glog.Errorf("Couldn't create a key for object %+v: %v", instance, err)
return fmt.Errorf("Couldn't create a key for object %+v: %v", instance, err)
if err := c.beginPollingInstance(instance); err != nil {
return err
}
c.pollingQueue.Add(key)
} else {
glog.V(5).Infof("Successfully provisioned Instance %v/%v of ServiceClass %v at Broker %v: response: %+v", instance.Namespace, instance.Name, serviceClass.Name, brokerName, response)

Expand Down Expand Up @@ -605,6 +654,11 @@ func (c *controller) pollInstance(serviceClass *v1alpha1.ServiceClass, servicePl
message,
)
}

err = c.continuePollingInstance(instance)
if err != nil {
return err
}
return fmt.Errorf("last operation not completed (still in progress) for %v/%v", instance.Namespace, instance.Name)
case osb.StateSucceeded:
// Update the instance to reflect that an async operation is no longer
Expand Down
64 changes: 37 additions & 27 deletions pkg/controller/controller_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controller
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
Expand Down Expand Up @@ -578,19 +577,25 @@ func TestReconcileInstanceAsynchronous(t *testing.T) {
updatedInstance := assertUpdateStatus(t, actions[0], instance)
assertInstanceReadyFalse(t, updatedInstance)

// The item should've been added to the pollingQueue for later processing
if testController.pollingQueue.Len() != 1 {
t.Fatalf("Expected the asynchronous instance to end up in the polling queue")
}
item, _ := testController.pollingQueue.Get()
if item == nil {
t.Fatalf("Did not get back a key from polling queue")
}
actualKey := item.(string)
expectedKey := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name)
if actualKey != expectedKey {
t.Fatalf("got key as %q expected %q", actualKey, expectedKey)
}
// Since polling is rate-limited, it is not possible to check whether the
// instance is in the polling queue.
//
// TODO: add a way to peak into rate-limited adds that are still pending,
// then uncomment.

// if testController.pollingQueue.Len() != 1 {
// t.Fatalf("Expected the asynchronous instance to end up in the polling queue")
// }
// item, _ := testController.pollingQueue.Get()
// if item == nil {
// t.Fatalf("Did not get back a key from polling queue")
// }
// actualKey := item.(string)
// expectedKey := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name)
// if actualKey != expectedKey {
// t.Fatalf("got key as %q expected %q", actualKey, expectedKey)
// }

assertAsyncOpInProgressTrue(t, updatedInstance)
assertInstanceLastOperation(t, updatedInstance, testOperation)
assertInstanceDashboardURL(t, updatedInstance, testDashboardURL)
Expand Down Expand Up @@ -649,19 +654,24 @@ func TestReconcileInstanceAsynchronousNoOperation(t *testing.T) {
updatedInstance := assertUpdateStatus(t, actions[0], instance)
assertInstanceReadyFalse(t, updatedInstance)

// The item should've been added to the pollingQueue for later processing
if testController.pollingQueue.Len() != 1 {
t.Fatalf("Expected the asynchronous instance to end up in the polling queue")
}
item, _ := testController.pollingQueue.Get()
if item == nil {
t.Fatalf("Did not get back a key from polling queue")
}
key := item.(string)
expectedKey := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name)
if key != expectedKey {
t.Fatalf("got key as %q expected %q", key, expectedKey)
}
// Since polling is rate-limited, it is not possible to check whether the
// instance is in the polling queue.
//
// TODO: add a way to peak into rate-limited adds that are still pending,
// then uncomment.

// if testController.pollingQueue.Len() != 1 {
// t.Fatalf("Expected the asynchronous instance to end up in the polling queue")
// }
// item, _ := testController.pollingQueue.Get()
// if item == nil {
// t.Fatalf("Did not get back a key from polling queue")
// }
// key := item.(string)
// expectedKey := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name)
// if key != expectedKey {
// t.Fatalf("got key as %q expected %q", key, expectedKey)
// }
assertAsyncOpInProgressTrue(t, updatedInstance)
assertInstanceLastOperation(t, updatedInstance, "")
}
Expand Down
Loading

0 comments on commit 861fb7f

Please sign in to comment.