Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Improve caching OSB clients #2577

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/controller-manager/app/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/client-go/informers"
"k8s.io/klog"
)

Expand Down Expand Up @@ -362,6 +363,9 @@ func StartControllers(s *options.ControllerManagerServer,
}
klog.V(5).Infof("Creating shared informers; resync interval: %v", s.ResyncInterval)

coreInformerFactory := informers.NewSharedInformerFactory(coreClient, s.ResyncInterval)
coreInformers := coreInformerFactory.Core()

// Build the informer factory for service-catalog resources
informerFactory := servicecataloginformers.NewSharedInformerFactory(
serviceCatalogClientBuilder.ClientOrDie("shared-informers"),
Expand All @@ -373,6 +377,7 @@ func StartControllers(s *options.ControllerManagerServer,
klog.V(5).Infof("Creating controller; broker relist interval: %v", s.ServiceBrokerRelistInterval)
serviceCatalogController, err := controller.NewController(
coreClient,
coreInformers.V1().Secrets(),
serviceCatalogClientBuilder.ClientOrDie(controllerManagerAgentName).ServicecatalogV1beta1(),
serviceCatalogSharedInformers.ClusterServiceBrokers(),
serviceCatalogSharedInformers.ServiceBrokers(),
Expand All @@ -397,9 +402,11 @@ func StartControllers(s *options.ControllerManagerServer,

klog.V(1).Info("Starting shared informers")
informerFactory.Start(stop)
coreInformerFactory.Start(stop)

klog.V(5).Info("Waiting for caches to sync")
informerFactory.WaitForCacheSync(stop)
coreInformerFactory.WaitForCacheSync(stop)

klog.V(5).Info("Running controller")
go serviceCatalogController.Run(s.ConcurrentSyncs, stop)
Expand Down
68 changes: 28 additions & 40 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
scfeatures "github.com/kubernetes-incubator/service-catalog/pkg/features"
"github.com/kubernetes-incubator/service-catalog/pkg/filter"
"github.com/kubernetes-incubator/service-catalog/pkg/pretty"
v12 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/listers/core/v1"
)

const (
Expand All @@ -75,6 +77,7 @@ const (
// NewController returns a new Open Service Broker catalog controller.
func NewController(
kubeClient kubernetes.Interface,
secretInformer v12.SecretInformer,
serviceCatalogClient servicecatalogclientset.ServicecatalogV1beta1Interface,
clusterServiceBrokerInformer informers.ClusterServiceBrokerInformer,
serviceBrokerInformer informers.ServiceBrokerInformer,
Expand All @@ -95,6 +98,7 @@ func NewController(
) (Controller, error) {
controller := &controller{
kubeClient: kubeClient,
secretLister: secretInformer.Lister(),
serviceCatalogClient: serviceCatalogClient,
brokerRelistInterval: brokerRelistInterval,
OSBAPIPreferredVersion: osbAPIPreferredVersion,
Expand All @@ -112,8 +116,9 @@ func NewController(
bindingPollingQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(pollingStartInterval, operationPollingMaximumBackoffDuration), "binding-poller"),
clusterIDConfigMapName: clusterIDConfigMapName,
clusterIDConfigMapNamespace: clusterIDConfigMapNamespace,
brokerClientManager: NewBrokerClientManager(brokerClientCreateFunc),
brokerClientCreateFunc: brokerClientCreateFunc,
}
controller.brokerClientManager = NewBrokerClientManager(brokerClientCreateFunc)

controller.clusterServiceBrokerLister = clusterServiceBrokerInformer.Lister()
clusterServiceBrokerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewController(
}
controller.instanceOperationRetryQueue.instances = make(map[string]backoffEntry)
controller.instanceOperationRetryQueue.rateLimiter = workqueue.NewItemExponentialFailureRateLimiter(minBrokerOperationRetryDelay, maxBrokerOperationRetryDelay)

return controller, nil
}

Expand All @@ -196,6 +202,7 @@ type controller struct {
bindingLister listers.ServiceBindingLister
clusterServicePlanLister listers.ClusterServicePlanLister
servicePlanLister listers.ServicePlanLister
secretLister v1.SecretLister
brokerRelistInterval time.Duration
OSBAPIPreferredVersion string
recorder record.EventRecorder
Expand Down Expand Up @@ -228,6 +235,8 @@ type controller struct {
instanceOperationRetryQueue instanceOperationBackoff
// BrokerClientManager holds all OSB clients for brokers.
brokerClientManager *BrokerClientManager

brokerClientCreateFunc osb.CreateFunc
}

// Run runs the controller until the given stop channel can be read from.
Expand Down Expand Up @@ -486,18 +495,11 @@ func (c *controller) getClusterServiceClassAndClusterServiceBroker(instance *v1b
serviceClass.Spec.ClusterServiceBrokerName,
),
}

}

brokerClient, found := c.brokerClientManager.BrokerClient(NewClusterServiceBrokerKey(serviceClass.Spec.ClusterServiceBrokerName))
if !found {
return nil, "", nil, &operationError{
reason: errorNonexistentClusterServiceBrokerReason,
message: fmt.Sprintf(
"The instance references a broker %q which has no OSB client created",
serviceClass.Spec.ClusterServiceBrokerName,
),
}
brokerClient, err := c.clusterServiceBrokerClient(broker)
if err != nil {
return nil, "", nil, err
}

return serviceClass, broker.Name, brokerClient, nil
Expand Down Expand Up @@ -530,15 +532,9 @@ func (c *controller) getServiceClassAndServiceBroker(instance *v1beta1.ServiceIn

}

brokerClient, found := c.brokerClientManager.BrokerClient(NewServiceBrokerKey(instance.Namespace, serviceClass.Spec.ServiceBrokerName))
if !found {
return nil, "", nil, &operationError{
reason: errorNonexistentClusterServiceBrokerReason,
message: fmt.Sprintf(
"The instance references a broker %q which has no OSB client created",
serviceClass.Spec.ServiceBrokerName,
),
}
brokerClient, err := c.serviceBrokerClient(broker)
if err != nil {
return nil, "", nil, err
}
return serviceClass, broker.Name, brokerClient, nil
}
Expand Down Expand Up @@ -644,11 +640,9 @@ func (c *controller) getClusterServiceBrokerForServiceBinding(instance *v1beta1.
}

func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceInstance, binding *v1beta1.ServiceBinding) (osb.Client, error) {

var brokerClient osb.Client

if instance.Spec.ClusterServiceClassSpecified() {

serviceClass, err := c.getClusterServiceClassForServiceBinding(instance, binding)
if err != nil {
return nil, err
Expand All @@ -659,15 +653,11 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
return nil, err
}

var found bool
brokerClient, found = c.brokerClientManager.BrokerClient(NewClusterServiceBrokerKey(broker.Name))

if !found {
return nil, fmt.Errorf("OSB client not found for the broker %s", broker.Name)
brokerClient, err = c.clusterServiceBrokerClient(broker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for non-nil err after this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, the check is missing, added

if err != nil {
return nil, err
}

} else if instance.Spec.ServiceClassSpecified() {

serviceClass, err := c.getServiceClassForServiceBinding(instance, binding)
if err != nil {
return nil, err
Expand All @@ -678,11 +668,9 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
return nil, err
}

var found bool
brokerClient, found = c.brokerClientManager.BrokerClient(NewServiceBrokerKey(broker.Namespace, broker.Name))

if !found {
return nil, fmt.Errorf("OSB client not found for the broker %s", broker.Name)
brokerClient, err = c.serviceBrokerClient(broker)
if err != nil {
return nil, err
}
}

Expand All @@ -693,15 +681,15 @@ func (c *controller) getBrokerClientForServiceBinding(instance *v1beta1.ServiceI
// getAuthCredentialsFromClusterServiceBroker returns the auth credentials, if any, or
// returns an error. If the AuthInfo field is nil, empty values are
// returned.
func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, broker *v1beta1.ClusterServiceBroker) (*osb.AuthConfig, error) {
func (c *controller) getAuthCredentialsFromClusterServiceBroker(broker *v1beta1.ClusterServiceBroker) (*osb.AuthConfig, error) {
if broker.Spec.AuthInfo == nil {
return nil, nil
}

authInfo := broker.Spec.AuthInfo
if authInfo.Basic != nil {
secretRef := authInfo.Basic.SecretRef
secret, err := client.CoreV1().Secrets(secretRef.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(secretRef.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -714,7 +702,7 @@ func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, bro
}, nil
} else if authInfo.Bearer != nil {
secretRef := authInfo.Bearer.SecretRef
secret, err := client.CoreV1().Secrets(secretRef.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(secretRef.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -731,15 +719,15 @@ func getAuthCredentialsFromClusterServiceBroker(client kubernetes.Interface, bro

// getAuthCredentialsFromServiceBroker returns the auth credentials, if any, or
// returns an error. If the AuthInfo field is nil, empty values are returned.
func getAuthCredentialsFromServiceBroker(client kubernetes.Interface, broker *v1beta1.ServiceBroker) (*osb.AuthConfig, error) {
func (c *controller) getAuthCredentialsFromServiceBroker(broker *v1beta1.ServiceBroker) (*osb.AuthConfig, error) {
if broker.Spec.AuthInfo == nil {
return nil, nil
}

authInfo := broker.Spec.AuthInfo
if authInfo.Basic != nil {
secretRef := authInfo.Basic.SecretRef
secret, err := client.CoreV1().Secrets(broker.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(broker.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand All @@ -752,7 +740,7 @@ func getAuthCredentialsFromServiceBroker(client kubernetes.Interface, broker *v1
}, nil
} else if authInfo.Bearer != nil {
secretRef := authInfo.Bearer.SecretRef
secret, err := client.CoreV1().Secrets(broker.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := c.secretLister.Secrets(broker.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_clusterservicebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func (c *controller) reconcileClusterServiceBrokerKey(key string) error {
return c.reconcileClusterServiceBroker(broker)
}

func (c *controller) updateClusterServiceBrokerClient(broker *v1beta1.ClusterServiceBroker) (osb.Client, error) {
func (c *controller) clusterServiceBrokerClient(broker *v1beta1.ClusterServiceBroker) (osb.Client, error) {
pcb := pretty.NewClusterServiceBrokerContextBuilder(broker)
klog.V(4).Info(pcb.Message("Updating broker client"))
authConfig, err := getAuthCredentialsFromClusterServiceBroker(c.kubeClient, broker)
authConfig, err := c.getAuthCredentialsFromClusterServiceBroker(broker)
if err != nil {
s := fmt.Sprintf("Error getting broker auth credentials: %s", err)
klog.Info(pcb.Message(s))
Expand Down Expand Up @@ -169,7 +169,7 @@ func (c *controller) reconcileClusterServiceBroker(broker *v1beta1.ClusterServic
if broker.DeletionTimestamp == nil { // Add or update
klog.V(4).Info(pcb.Message("Processing adding/update event"))

brokerClient, err := c.updateClusterServiceBrokerClient(broker)
brokerClient, err := c.clusterServiceBrokerClient(broker)
if err != nil {
return err
}
Expand Down
16 changes: 1 addition & 15 deletions pkg/controller/controller_clusterservicebroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,7 @@ func testReconcileClusterServiceBrokerWithAuth(t *testing.T, authInfo *v1beta1.C

broker := getTestClusterServiceBrokerWithAuth(authInfo)
if secret != nil {
addGetSecretReaction(fakeKubeClient, secret)
} else {
addGetSecretNotFoundReaction(fakeKubeClient)
fakeKubeClient.Core().Secrets(secret.Namespace).Create(secret)
}
testClusterServiceClass := getTestClusterServiceClass()
fakeClusterServiceBrokerClient.CatalogReaction = &fakeosb.CatalogReaction{
Expand Down Expand Up @@ -896,18 +894,6 @@ func testReconcileClusterServiceBrokerWithAuth(t *testing.T, authInfo *v1beta1.C
assertClusterServiceBrokerReadyFalse(t, updatedClusterServiceBroker)
}

// verify one kube action occurred
kubeActions := fakeKubeClient.Actions()
assertNumberOfActions(t, kubeActions, 1)

getAction := kubeActions[0].(clientgotesting.GetAction)
if e, a := "get", getAction.GetVerb(); e != a {
t.Fatalf("Unexpected verb on action; %s", expectedGot(e, a))
}
if e, a := "secrets", getAction.GetResource().Resource; e != a {
t.Fatalf("Unexpected resource on action; %s", expectedGot(e, a))
}

events := getRecordedEvents(testController)
assertNumEvents(t, events, 1)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,9 +968,9 @@ func (c *controller) pollServiceInstance(instance *v1beta1.ServiceInstance) erro
var brokerClient osb.Client
var err error
if instance.Spec.ClusterServiceClassSpecified() {
_, _, _, brokerClient, err = c.getClusterServiceClassPlanAndClusterServiceBroker(instance)
_, _, brokerClient, err = c.getClusterServiceClassAndClusterServiceBroker(instance)
} else {
_, _, _, brokerClient, err = c.getServiceClassPlanAndServiceBroker(instance)
_, _, brokerClient, err = c.getServiceClassAndServiceBroker(instance)
}
if err != nil {
return c.handleServiceInstanceReconciliationError(instance, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_servicebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (c *controller) reconcileServiceBrokerKey(key string) error {
return c.reconcileServiceBroker(broker)
}

func (c *controller) updateServiceBrokerClient(broker *v1beta1.ServiceBroker) (osb.Client, error) {
func (c *controller) serviceBrokerClient(broker *v1beta1.ServiceBroker) (osb.Client, error) {
pcb := pretty.NewServiceBrokerContextBuilder(broker)
authConfig, err := getAuthCredentialsFromServiceBroker(c.kubeClient, broker)
authConfig, err := c.getAuthCredentialsFromServiceBroker(broker)
if err != nil {
s := fmt.Sprintf("Error getting broker auth credentials: %s", err)
klog.Info(pcb.Message(s))
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *controller) reconcileServiceBroker(broker *v1beta1.ServiceBroker) error
if broker.DeletionTimestamp == nil { // Add or update
klog.V(4).Info(pcb.Message("Processing adding/update event"))

brokerClient, err := c.updateServiceBrokerClient(broker)
brokerClient, err := c.serviceBrokerClient(broker)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientgofake "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -2327,11 +2328,15 @@ func newTestController(t *testing.T, config fakeosb.FakeClientConfiguration) (
informerFactory := servicecataloginformers.NewSharedInformerFactory(fakeCatalogClient, 0)
serviceCatalogSharedInformers := informerFactory.Servicecatalog().V1beta1()

k8sInformerFactory := informers.NewSharedInformerFactory(fakeKubeClient, 0)
k8sInformers := k8sInformerFactory.Core().V1()

fakeRecorder := record.NewFakeRecorder(5)

// create a test controller
testController, err := NewController(
fakeKubeClient,
k8sInformers.Secrets(),
fakeCatalogClient.ServicecatalogV1beta1(),
serviceCatalogSharedInformers.ClusterServiceBrokers(),
serviceCatalogSharedInformers.ServiceBrokers(),
Expand Down
Loading