Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Commit

Permalink
Create OSB client if does not exists in the broker client manager
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrmiskiewicz committed Mar 8, 2019
1 parent ce2050d commit cbae253
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 75 deletions.
7 changes: 7 additions & 0 deletions cmd/controller-manager/app/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/client-go/informers"
"k8s.io/klog"
)

Expand Down Expand Up @@ -362,6 +363,9 @@ func StartControllers(s *options.ControllerManagerServer,
}
klog.V(5).Infof("Creating shared informers; resync interval: %v", s.ResyncInterval)

coreInformerFactory := informers.NewSharedInformerFactory(coreClient, s.ResyncInterval)
coreInformers := coreInformerFactory.Core()

// Build the informer factory for service-catalog resources
informerFactory := servicecataloginformers.NewSharedInformerFactory(
serviceCatalogClientBuilder.ClientOrDie("shared-informers"),
Expand All @@ -373,6 +377,7 @@ func StartControllers(s *options.ControllerManagerServer,
klog.V(5).Infof("Creating controller; broker relist interval: %v", s.ServiceBrokerRelistInterval)
serviceCatalogController, err := controller.NewController(
coreClient,
coreInformers.V1().Secrets(),
serviceCatalogClientBuilder.ClientOrDie(controllerManagerAgentName).ServicecatalogV1beta1(),
serviceCatalogSharedInformers.ClusterServiceBrokers(),
serviceCatalogSharedInformers.ServiceBrokers(),
Expand All @@ -397,9 +402,11 @@ func StartControllers(s *options.ControllerManagerServer,

klog.V(1).Info("Starting shared informers")
informerFactory.Start(stop)
coreInformerFactory.Start(stop)

klog.V(5).Info("Waiting for caches to sync")
informerFactory.WaitForCacheSync(stop)
coreInformerFactory.WaitForCacheSync(stop)

klog.V(5).Info("Running controller")
go serviceCatalogController.Run(s.ConcurrentSyncs, stop)
Expand Down
68 changes: 28 additions & 40 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
scfeatures "github.com/kubernetes-incubator/service-catalog/pkg/features"
"github.com/kubernetes-incubator/service-catalog/pkg/filter"
"github.com/kubernetes-incubator/service-catalog/pkg/pretty"
v12 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/listers/core/v1"
)

const (
Expand All @@ -75,6 +77,7 @@ const (
// NewController returns a new Open Service Broker catalog controller.
func NewController(
kubeClient kubernetes.Interface,
secretInformer v12.SecretInformer,
serviceCatalogClient servicecatalogclientset.ServicecatalogV1beta1Interface,
clusterServiceBrokerInformer informers.ClusterServiceBrokerInformer,
serviceBrokerInformer informers.ServiceBrokerInformer,
Expand All @@ -95,6 +98,7 @@ func NewController(
) (Controller, error) {
controller := &controller{
kubeClient: kubeClient,
secretLister: secretInformer.Lister(),
serviceCatalogClient: serviceCatalogClient,
brokerRelistInterval: brokerRelistInterval,
OSBAPIPreferredVersion: osbAPIPreferredVersion,
Expand All @@ -112,8 +116,9 @@ func NewController(
bindingPollingQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(pollingStartInterval, operationPollingMaximumBackoffDuration), "binding-poller"),
clusterIDConfigMapName: clusterIDConfigMapName,
clusterIDConfigMapNamespace: clusterIDConfigMapNamespace,
brokerClientManager: NewBrokerClientManager(brokerClientCreateFunc),
brokerClientCreateFunc: brokerClientCreateFunc,
}
controller.brokerClientManager = NewBrokerClientManager(brokerClientCreateFunc)

controller.clusterServiceBrokerLister = clusterServiceBrokerInformer.Lister()
clusterServiceBrokerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewController(
}
controller.instanceOperationRetryQueue.instances = make(map[string]backoffEntry)
controller.instanceOperationRetryQueue.rateLimiter = workqueue.NewItemExponentialFailureRateLimiter(minBrokerOperationRetryDelay, maxBrokerOperationRetryDelay)

return controller, nil
}

Expand All @@ -196,6 +202,7 @@ type controller struct {
bindingLister listers.ServiceBindingLister
clusterServicePlanLister listers.ClusterServicePlanLister
servicePlanLister listers.ServicePlanLister
secretLister v1.SecretLister
brokerRelistInterval time.Duration
OSBAPIPreferredVersion string
recorder record.EventRecorder
Expand Down Expand Up @@ -228,6 +235,8 @@ type controller struct {
instanceOperationRetryQueue instanceOperationBackoff
// BrokerClientManager holds all OSB clients for brokers.
brokerClientManager *BrokerClientManager

brokerClientCreateFunc osb.CreateFunc
}

// Run runs the controller until the given stop channel can be read from.
Expand Down Expand Up @@ -486,18 +495,11 @@ func (c *controller) getClusterServiceClassAndClusterServiceBroker(instance *v1b
serviceClass.Spec.ClusterServiceBrokerName,
),
}

}

brokerClient, found := c.brokerClientManager.BrokerClient(NewClusterServiceBrokerKey(serviceClass.Spec.ClusterServiceBrokerName))
if !found {
return nil, "", nil, &operationError{
reason: errorNonexistentClusterServiceBrokerReason,
message: fmt.Sprintf(
"The instance references a broker %q which has no OSB client created",
serviceClass.Spec.ClusterServiceBrokerName,
),
}
brokerClient, err := c.clusterServiceBrokerClient(broker)
if err != nil {
return nil, "", nil, err
}

return serviceClass, broker.Name, brokerClient, nil
Expand Down Expand Up @@ -530,15 +532,9 @@ func (c *controller) getServiceClassAndServiceBroker(instance *v1beta1.ServiceIn

}

brokerClient, found := c.brokerClientManager.BrokerClient(NewServiceBrokerKey(instance.Namespace, serviceClass.Spec.ServiceBrokerName))
if !found {
return nil, "", nil, &operationError{
reason: errorNonexistentClusterServiceBrokerReason,
message: fmt.Sprintf(
"The instance references a broker %q which has no OSB client created",
serviceClass.Spec.ServiceBrokerName,
),
}
brokerClient, err := c.serviceBrokerClient(broker)
if err != nil {
return nil, "", nil, err
}
return serviceClass, broker.Name, brokerClient, nil
}
Expand Down Expand Up @@ -644,11 +640,9 @@ func (c *controller) getClusterServiceBrokerForServiceBinding(instance *v1beta1.
}

func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceInstance, binding *v1beta1.ServiceBinding) (osb.Client, error) {

var brokerClient osb.Client

if instance.Spec.ClusterServiceClassSpecified() {

serviceClass, err := c.getClusterServiceClassForServiceBinding(instance, binding)
if err != nil {
return nil, err
Expand All @@ -659,15 +653,11 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
return nil, err
}

var found bool
brokerClient, found = c.brokerClientManager.BrokerClient(NewClusterServiceBrokerKey(broker.Name))

if !found {
return nil, fmt.Errorf("OSB client not found for the broker %s", broker.Name)
brokerClient, err = c.clusterServiceBrokerClient(broker)
if err != nil {
return nil, err
}

} else if instance.Spec.ServiceClassSpecified() {

serviceClass, err := c.getServiceClassForServiceBinding(instance, binding)
if err != nil {
return nil, err
Expand All @@ -678,11 +668,9 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
return nil, err
}

var found bool
brokerClient, found = c.brokerClientManager.BrokerClient(NewServiceBrokerKey(broker.Namespace, broker.Name))

if !found {
return nil, fmt.Errorf("OSB client not found for the broker %s", broker.Name)
brokerClient, err = c.serviceBrokerClient(broker)
if err != nil {
return nil, err
}
}

Expand All @@ -693,15 +681,15 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
// getAuthCredentialsFromClusterServiceBroker returns the auth credentials, if any, or
// returns an error. If the AuthInfo field is nil, empty values are
// returned.
func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, broker *v1beta1.ClusterServiceBroker) (*osb.AuthConfig, error) {
func (c *controller) getAuthCredentialsFromClusterServiceBroker(broker *v1beta1.ClusterServiceBroker) (*osb.AuthConfig, error) {
if broker.Spec.AuthInfo == nil {
return nil, nil
}

authInfo := broker.Spec.AuthInfo
if authInfo.Basic != nil {
secretRef := authInfo.Basic.SecretRef
secret, err := client.CoreV1().Secrets(secretRef.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(secretRef.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -714,7 +702,7 @@ func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, bro
}, nil
} else if authInfo.Bearer != nil {
secretRef := authInfo.Bearer.SecretRef
secret, err := client.CoreV1().Secrets(secretRef.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(secretRef.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -731,15 +719,15 @@ func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, bro

// getAuthCredentialsFromServiceBroker returns the auth credentials, if any, or
// returns an error. If the AuthInfo field is nil, empty values are returned.
func getAuthCredentialsFromServiceBroker(client kubernetes.Interface, broker *v1beta1.ServiceBroker) (*osb.AuthConfig, error) {
func (c *controller) getAuthCredentialsFromServiceBroker(broker *v1beta1.ServiceBroker) (*osb.AuthConfig, error) {
if broker.Spec.AuthInfo == nil {
return nil, nil
}

authInfo := broker.Spec.AuthInfo
if authInfo.Basic != nil {
secretRef := authInfo.Basic.SecretRef
secret, err := client.CoreV1().Secrets(broker.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(broker.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -752,7 +740,7 @@ func getAuthCredentialsFromServiceBroker(client kubernetes.Interface, broker *v1
}, nil
} else if authInfo.Bearer != nil {
secretRef := authInfo.Bearer.SecretRef
secret, err := client.CoreV1().Secrets(broker.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(broker.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_clusterservicebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func (c *controller) reconcileClusterServiceBrokerKey(key string) error {
return c.reconcileClusterServiceBroker(broker)
}

func (c *controller) updateClusterServiceBrokerClient(broker *v1beta1.ClusterServiceBroker) (osb.Client, error) {
func (c *controller) clusterServiceBrokerClient(broker *v1beta1.ClusterServiceBroker) (osb.Client, error) {
pcb := pretty.NewClusterServiceBrokerContextBuilder(broker)
klog.V(4).Info(pcb.Message("Updating broker client"))
authConfig, err := getAuthCredentialsFromClusterServiceBroker(c.kubeClient, broker)
authConfig, err := c.getAuthCredentialsFromClusterServiceBroker(broker)
if err != nil {
s := fmt.Sprintf("Error getting broker auth credentials: %s", err)
klog.Info(pcb.Message(s))
Expand Down Expand Up @@ -169,7 +169,7 @@ func (c *controller) reconcileClusterServiceBroker(broker *v1beta1.ClusterServic
if broker.DeletionTimestamp == nil { // Add or update
klog.V(4).Info(pcb.Message("Processing adding/update event"))

brokerClient, err := c.updateClusterServiceBrokerClient(broker)
brokerClient, err := c.clusterServiceBrokerClient(broker)
if err != nil {
return err
}
Expand Down
16 changes: 1 addition & 15 deletions pkg/controller/controller_clusterservicebroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,7 @@ func testReconcileClusterServiceBrokerWithAuth(t *testing.T, authInfo *v1beta1.C

broker := getTestClusterServiceBrokerWithAuth(authInfo)
if secret != nil {
addGetSecretReaction(fakeKubeClient, secret)
} else {
addGetSecretNotFoundReaction(fakeKubeClient)
fakeKubeClient.Core().Secrets(secret.Namespace).Create(secret)
}
testClusterServiceClass := getTestClusterServiceClass()
fakeClusterServiceBrokerClient.CatalogReaction = &fakeosb.CatalogReaction{
Expand Down Expand Up @@ -896,18 +894,6 @@ func testReconcileClusterServiceBrokerWithAuth(t *testing.T, authInfo *v1beta1.C
assertClusterServiceBrokerReadyFalse(t, updatedClusterServiceBroker)
}

// verify one kube action occurred
kubeActions := fakeKubeClient.Actions()
assertNumberOfActions(t, kubeActions, 1)

getAction := kubeActions[0].(clientgotesting.GetAction)
if e, a := "get", getAction.GetVerb(); e != a {
t.Fatalf("Unexpected verb on action; %s", expectedGot(e, a))
}
if e, a := "secrets", getAction.GetResource().Resource; e != a {
t.Fatalf("Unexpected resource on action; %s", expectedGot(e, a))
}

events := getRecordedEvents(testController)
assertNumEvents(t, events, 1)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,9 +968,9 @@ func (c *controller) pollServiceInstance(instance *v1beta1.ServiceInstance) erro
var brokerClient osb.Client
var err error
if instance.Spec.ClusterServiceClassSpecified() {
_, _, _, brokerClient, err = c.getClusterServiceClassPlanAndClusterServiceBroker(instance)
_, _, brokerClient, err = c.getClusterServiceClassAndClusterServiceBroker(instance)
} else {
_, _, _, brokerClient, err = c.getServiceClassPlanAndServiceBroker(instance)
_, _, brokerClient, err = c.getServiceClassAndServiceBroker(instance)
}
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_servicebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (c *controller) reconcileServiceBrokerKey(key string) error {
return c.reconcileServiceBroker(broker)
}

func (c *controller) updateServiceBrokerClient(broker *v1beta1.ServiceBroker) (osb.Client, error) {
func (c *controller) serviceBrokerClient(broker *v1beta1.ServiceBroker) (osb.Client, error) {
pcb := pretty.NewServiceBrokerContextBuilder(broker)
authConfig, err := getAuthCredentialsFromServiceBroker(c.kubeClient, broker)
authConfig, err := c.getAuthCredentialsFromServiceBroker(broker)
if err != nil {
s := fmt.Sprintf("Error getting broker auth credentials: %s", err)
klog.Info(pcb.Message(s))
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *controller) reconcileServiceBroker(broker *v1beta1.ServiceBroker) error
if broker.DeletionTimestamp == nil { // Add or update
klog.V(4).Info(pcb.Message("Processing adding/update event"))

brokerClient, err := c.updateServiceBrokerClient(broker)
brokerClient, err := c.serviceBrokerClient(broker)
if err != nil {
return err
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/controller/controller_servicebroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,6 @@ func TestShouldReconcileServiceBroker(t *testing.T) {
}
}

func TestReconcileServiceBrokerUpdatesBrokerClient(t *testing.T) {
broker := getTestServiceBroker()
broker.Name = broker.Name + "not-predefined"
_, _, _, testController, _ := newTestController(t, noFakeActions())
testController.reconcileServiceBroker(broker)

_, found := testController.brokerClientManager.BrokerClient(NewServiceBrokerKey(broker.Namespace, broker.Name))
if !found {
t.Error("expected predefined OSB client")
}
}

func getServiceBrokerReactor(broker *v1beta1.ServiceBroker) (string, string, clientgotesting.ReactionFunc) {
return "get", "servicebrokers", func(action clientgotesting.Action) (bool, runtime.Object, error) {
return true, broker, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientgofake "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -2327,11 +2328,15 @@ func newTestController(t *testing.T, config fakeosb.FakeClientConfiguration) (
informerFactory := servicecataloginformers.NewSharedInformerFactory(fakeCatalogClient, 0)
serviceCatalogSharedInformers := informerFactory.Servicecatalog().V1beta1()

k8sInformerFactory := informers.NewSharedInformerFactory(fakeKubeClient, 0)
k8sInformers := k8sInformerFactory.Core().V1()

fakeRecorder := record.NewFakeRecorder(5)

// create a test controller
testController, err := NewController(
fakeKubeClient,
k8sInformers.Secrets(),
fakeCatalogClient.ServicecatalogV1beta1(),
serviceCatalogSharedInformers.ClusterServiceBrokers(),
serviceCatalogSharedInformers.ServiceBrokers(),
Expand Down
Loading

0 comments on commit cbae253

Please sign in to comment.