diff --git a/pkg/clusterclientstore/cache/cluster.go b/pkg/clusterclientstore/cache/cluster.go index 4bfa86510..00979a03e 100644 --- a/pkg/clusterclientstore/cache/cluster.go +++ b/pkg/clusterclientstore/cache/cluster.go @@ -3,10 +3,13 @@ package cache import ( "sync" - shippererrors "github.com/bookingcom/shipper/pkg/errors" kubeinformers "k8s.io/client-go/informers" kubernetes "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + shipperclientset "github.com/bookingcom/shipper/pkg/client/clientset/versioned" + shipperinformers "github.com/bookingcom/shipper/pkg/client/informers/externalversions" + shippererrors "github.com/bookingcom/shipper/pkg/errors" ) const ( @@ -20,38 +23,52 @@ func NewCluster( name, checksum string, config *rest.Config, - informerFactory kubeinformers.SharedInformerFactory, - buildClient func(string, string, *rest.Config) (kubernetes.Interface, error), + kubeInformerFactory kubeinformers.SharedInformerFactory, + shipperInformerFactory shipperinformers.SharedInformerFactory, + buildKubeClient func(string, string, *rest.Config) (kubernetes.Interface, error), + buildShipperClient func(string, string, *rest.Config) (shipperclientset.Interface, error), cacheSyncCb func(), ) *Cluster { return &Cluster{ - state: StateNotReady, - name: name, - checksum: checksum, - config: config, - informerFactory: informerFactory, - buildClient: buildClient, - cacheSyncCb: cacheSyncCb, - stopCh: make(chan struct{}), + name: name, + checksum: checksum, + config: config, + + state: StateNotReady, + + kubeClients: make(map[string]kubernetes.Interface), + shipperClients: make(map[string]shipperclientset.Interface), - clients: make(map[string]kubernetes.Interface), + kubeInformerFactory: kubeInformerFactory, + buildKubeClient: buildKubeClient, + + shipperInformerFactory: shipperInformerFactory, + buildShipperClient: buildShipperClient, + + cacheSyncCb: cacheSyncCb, + + stopCh: make(chan struct{}), } } type Cluster struct { - name string + // These are all read-only after initialization, so no lock needed. + name string + checksum string + config *rest.Config stateMut sync.RWMutex state string - clientsMut sync.Mutex - clients map[string]kubernetes.Interface + clientsMut sync.Mutex + kubeClients map[string]kubernetes.Interface + shipperClients map[string]shipperclientset.Interface - // These are all read-only after initialization, so no lock needed. - checksum string - config *rest.Config - informerFactory kubeinformers.SharedInformerFactory - buildClient func(string, string, *rest.Config) (kubernetes.Interface, error) + kubeInformerFactory kubeinformers.SharedInformerFactory + buildKubeClient func(string, string, *rest.Config) (kubernetes.Interface, error) + + shipperInformerFactory shipperinformers.SharedInformerFactory + buildShipperClient func(string, string, *rest.Config) (shipperclientset.Interface, error) cacheSyncCb func() @@ -64,10 +81,10 @@ func (c *Cluster) IsReady() bool { return c.state == StateReady } -// GetClient returns a client for the user agent specified by ua. If a client -// doesn't exist for that user agent, one will be created by calling the -// buildClient func. -func (c *Cluster) GetClient(ua string) (kubernetes.Interface, error) { +// GetKubeClient returns a kubernetes client for the user agent specified by +// ua. If a client doesn't exist for that user agent, one will be created by +// calling the buildKubeClient func. +func (c *Cluster) GetKubeClient(ua string) (kubernetes.Interface, error) { if !c.IsReady() { return nil, shippererrors.NewClusterNotReadyError(c.name) } @@ -75,16 +92,41 @@ func (c *Cluster) GetClient(ua string) (kubernetes.Interface, error) { c.clientsMut.Lock() defer c.clientsMut.Unlock() - if client, ok := c.clients[ua]; ok { + if client, ok := c.kubeClients[ua]; ok { return client, nil } - client, err := c.buildClient(c.name, ua, c.config) + client, err := c.buildKubeClient(c.name, ua, c.config) if err != nil { return nil, err } - c.clients[ua] = client + c.kubeClients[ua] = client + + return client, nil +} + +// GetShipperClient returns a shipperrnetes client for the user agent specified by +// ua. If a client doesn't exist for that user agent, one will be created by +// calling the buildShipperClient func. +func (c *Cluster) GetShipperClient(ua string) (shipperclientset.Interface, error) { + if !c.IsReady() { + return nil, shippererrors.NewClusterNotReadyError(c.name) + } + + c.clientsMut.Lock() + defer c.clientsMut.Unlock() + + if client, ok := c.shipperClients[ua]; ok { + return client, nil + } + + client, err := c.buildShipperClient(c.name, ua, c.config) + if err != nil { + return nil, err + } + + c.shipperClients[ua] = client return client, nil } @@ -105,12 +147,20 @@ func (c *Cluster) GetChecksum() (string, error) { return c.checksum, nil } -func (c *Cluster) GetInformerFactory() (kubeinformers.SharedInformerFactory, error) { +func (c *Cluster) GetKubeInformerFactory() (kubeinformers.SharedInformerFactory, error) { if !c.IsReady() { return nil, shippererrors.NewClusterNotReadyError(c.name) } - return c.informerFactory, nil + return c.kubeInformerFactory, nil +} + +func (c *Cluster) GetShipperInformerFactory() (shipperinformers.SharedInformerFactory, error) { + if !c.IsReady() { + return nil, shippererrors.NewClusterNotReadyError(c.name) + } + + return c.shipperInformerFactory, nil } // This will block until the cache syncs. If the cache is never going to sync @@ -131,10 +181,17 @@ func (c *Cluster) WaitForInformerCache() { c.state = StateWaitingForSync c.stateMut.Unlock() - c.informerFactory.Start(c.stopCh) + c.kubeInformerFactory.Start(c.stopCh) + c.shipperInformerFactory.Start(c.stopCh) + ok := true - syncedInformers := c.informerFactory.WaitForCacheSync(c.stopCh) - for _, synced := range syncedInformers { + syncedKubeInformers := c.kubeInformerFactory.WaitForCacheSync(c.stopCh) + for _, synced := range syncedKubeInformers { + ok = ok && synced + } + + syncedShipperInformers := c.shipperInformerFactory.WaitForCacheSync(c.stopCh) + for _, synced := range syncedShipperInformers { ok = ok && synced } diff --git a/pkg/clusterclientstore/cache/server_test.go b/pkg/clusterclientstore/cache/server_test.go index 85edb5976..62fe26ca2 100644 --- a/pkg/clusterclientstore/cache/server_test.go +++ b/pkg/clusterclientstore/cache/server_test.go @@ -5,11 +5,15 @@ import ( "testing" "time" - shippererrors "github.com/bookingcom/shipper/pkg/errors" kubeinformers "k8s.io/client-go/informers" kubernetes "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" + + shipperclientset "github.com/bookingcom/shipper/pkg/client/clientset/versioned" + shipperfake "github.com/bookingcom/shipper/pkg/client/clientset/versioned/fake" + shipperinformers "github.com/bookingcom/shipper/pkg/client/informers/externalversions" + shippererrors "github.com/bookingcom/shipper/pkg/errors" ) const ( @@ -131,9 +135,14 @@ func TestReplacement(t *testing.T) { t.Errorf("expected GetChecksum on replaced cluster to return ClusterNotReady, got %v", err) } - _, err = existing.GetClient("foo") + _, err = existing.GetKubeClient("foo") + if !shippererrors.IsClusterNotReadyError(err) { + t.Errorf("expected GetKubeClient on replaced cluster to return ClusterNotReady, got %v", err) + } + + _, err = existing.GetShipperClient("foo") if !shippererrors.IsClusterNotReadyError(err) { - t.Errorf("expected GetClient on replaced cluster to return ClusterNotReady, got %v", err) + t.Errorf("expected GetShipperClient on replaced cluster to return ClusterNotReady, got %v", err) } _, err = existing.GetConfig() @@ -141,24 +150,35 @@ func TestReplacement(t *testing.T) { t.Errorf("expected GetConfig on replaced cluster to return ClusterNotReady, got %v", err) } - _, err = existing.GetInformerFactory() + _, err = existing.GetKubeInformerFactory() if !shippererrors.IsClusterNotReadyError(err) { - t.Errorf("expected GetInformerFactory on replaced cluster to return ClusterNotReady, got %v", err) + t.Errorf("expected GetKubeInformerFactory on replaced cluster to return ClusterNotReady, got %v", err) + } + + _, err = existing.GetShipperInformerFactory() + if !shippererrors.IsClusterNotReadyError(err) { + t.Errorf("expected GetShipperInformerFactory on replaced cluster to return ClusterNotReady, got %v", err) } } func newCluster(name string) *Cluster { kubeClient := kubefake.NewSimpleClientset() + shipperClient := shipperfake.NewSimpleClientset() const noResyncPeriod time.Duration = 0 kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, noResyncPeriod) + shipperInformerFactory := shipperinformers.NewSharedInformerFactory(shipperClient, noResyncPeriod) config := &rest.Config{} return NewCluster( - name, testChecksum, config, kubeInformerFactory, + name, testChecksum, config, + kubeInformerFactory, shipperInformerFactory, func(cluster, ua string, config *rest.Config) (kubernetes.Interface, error) { return nil, nil }, + func(cluster, ua string, config *rest.Config) (shipperclientset.Interface, error) { + return nil, nil + }, func() {}, ) } diff --git a/pkg/clusterclientstore/handlers.go b/pkg/clusterclientstore/handlers.go deleted file mode 100644 index 0b65b606c..000000000 --- a/pkg/clusterclientstore/handlers.go +++ /dev/null @@ -1,18 +0,0 @@ -package clusterclientstore - -import ( - kubeinformers "k8s.io/client-go/informers" -) - -// SubscriptionRegisterFunc should call the relevant functions on a shared -// informer factory to set up watches. -// -// Note that there should be no event handlers being assigned to any informers -// in this function. -type SubscriptionRegisterFunc func(kubeinformers.SharedInformerFactory) - -// EventHandlerRegisterFunc is called after the caches for the clusters have -// been built, and provides a hook for a controller to register its event -// handlers. These will be event handlers for changes to the resources that the -// controller has subscribed to in the `SubscriptionRegisterFunc` callback. -type EventHandlerRegisterFunc func(kubeinformers.SharedInformerFactory, string) diff --git a/pkg/clusterclientstore/interface.go b/pkg/clusterclientstore/interface.go index 2c3c996d3..77647be81 100644 --- a/pkg/clusterclientstore/interface.go +++ b/pkg/clusterclientstore/interface.go @@ -25,3 +25,21 @@ type ClientsetInterface interface { GetShipperClient() shipperclientset.Interface GetShipperInformerFactory() shipperinformers.SharedInformerFactory } + +// SubscriptionRegisterFunc should call the relevant functions on a shared +// informer factory to set up watches. +// +// Note that there should be no event handlers being assigned to any informers +// in this function. +type SubscriptionRegisterFunc func(kubeinformers.SharedInformerFactory, shipperinformers.SharedInformerFactory) + +// EventHandlerRegisterFunc is called after the caches for the clusters have +// been built, and provides a hook for a controller to register its event +// handlers. These will be event handlers for changes to the resources that the +// controller has subscribed to in the `SubscriptionRegisterFunc` callback. +type EventHandlerRegisterFunc func(kubeinformers.SharedInformerFactory, shipperinformers.SharedInformerFactory, string) + +// This enables tests to inject an appropriate fake client, which allows us to +// use the real cluster client store in unit tests. +type KubeClientBuilderFunc func(string, string, *rest.Config) (kubernetes.Interface, error) +type ShipperClientBuilderFunc func(string, string, *rest.Config) (shipperclientset.Interface, error) diff --git a/pkg/clusterclientstore/store.go b/pkg/clusterclientstore/store.go index dc38b66b0..d56c31593 100644 --- a/pkg/clusterclientstore/store.go +++ b/pkg/clusterclientstore/store.go @@ -28,12 +28,10 @@ import ( shippererrors "github.com/bookingcom/shipper/pkg/errors" ) -const AgentName = "clusterclientstore" - -// This enables tests to inject an appropriate fake client, which allows us to -// use the real cluster client store in unit tests. -type KubeClientBuilderFunc func(string, string, *rest.Config) (kubernetes.Interface, error) -type ShipperClientBuilderFunc func(string, string, *rest.Config) (shipperclientset.Interface, error) +const ( + AgentName = "clusterclientstore" + noTimeout = 0 * time.Second +) type Store struct { ns string @@ -132,27 +130,6 @@ func (s *Store) AddEventHandlerCallback(eventHandler EventHandlerRegisterFunc) { s.eventHandlerRegisterFuncs = append(s.eventHandlerRegisterFuncs, eventHandler) } -// GetClient returns a client for the specified cluster name and user agent -// pair. -func (s *Store) GetClient(clusterName string, ua string) (kubernetes.Interface, error) { - cluster, ok := s.cache.Fetch(clusterName) - if !ok { - return nil, shippererrors.NewClusterNotInStoreError(clusterName) - } - - return cluster.GetClient(ua) -} - -// GetConfig returns a rest.Config for the specified cluster name. -func (s *Store) GetConfig(clusterName string) (*rest.Config, error) { - cluster, ok := s.cache.Fetch(clusterName) - if !ok { - return nil, shippererrors.NewClusterNotInStoreError(clusterName) - } - - return cluster.GetConfig() -} - func (s *Store) GetApplicationClusterClientset(clusterName, userAgent string) (ClientsetInterface, error) { cluster, ok := s.cache.Fetch(clusterName) if !ok { @@ -164,17 +141,22 @@ func (s *Store) GetApplicationClusterClientset(clusterName, userAgent string) (C return nil, err } - kubeClient, err := cluster.GetClient(userAgent) + kubeClient, err := cluster.GetKubeClient(userAgent) if err != nil { return nil, err } - kubeInformerFactory, err := cluster.GetInformerFactory() + kubeInformerFactory, err := cluster.GetKubeInformerFactory() if err != nil { return nil, err } - shipperClient, err := s.buildShipperClient(clusterName, userAgent, config) + shipperClient, err := cluster.GetShipperClient(userAgent) + if err != nil { + return nil, err + } + + shipperInformerFactory, err := cluster.GetShipperInformerFactory() if err != nil { return nil, err } @@ -184,21 +166,10 @@ func (s *Store) GetApplicationClusterClientset(clusterName, userAgent string) (C kubeClient, kubeInformerFactory, shipperClient, - s.shipperInformerFactory, + shipperInformerFactory, ), nil } -// GetInformerFactory returns an informer factory for the specified -// cluster name. -func (s *Store) GetInformerFactory(clusterName string) (kubeinformers.SharedInformerFactory, error) { - cluster, ok := s.cache.Fetch(clusterName) - if !ok { - return nil, shippererrors.NewClusterNotInStoreError(clusterName) - } - - return cluster.GetInformerFactory() -} - func (s *Store) syncCluster(name string) error { // No splitting here because clusters are not namespaced. clusterObj, err := s.clusterInformer.Lister().Get(name) @@ -315,32 +286,37 @@ func (s *Store) create(cluster *shipper.Cluster, secret *corev1.Secret) error { return shippererrors.NewClusterClientBuild(cluster.Name, err) } - informerClient, err := s.buildKubeClient(cluster.Name, AgentName, informerConfig) + kubeInformerClient, err := s.buildKubeClient(cluster.Name, AgentName, informerConfig) + if err != nil { + return shippererrors.NewClusterClientBuild(cluster.Name, err) + } + + shipperInformerClient, err := s.buildShipperClient(cluster.Name, AgentName, informerConfig) if err != nil { return shippererrors.NewClusterClientBuild(cluster.Name, err) } - informerFactory := kubeinformers.NewSharedInformerFactory(informerClient, 0*time.Second) + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeInformerClient, noTimeout) + shipperInformerFactory := shipperinformers.NewSharedInformerFactory(shipperInformerClient, noTimeout) + // Register all the resources that the controllers are interested in, e.g. // informerFactory.Core().V1().Pods().Informer(). for _, cb := range s.subscriptionRegisterFuncs { - cb(informerFactory) + cb(kubeInformerFactory, shipperInformerFactory) } clusterName := cluster.Name checksum := computeSecretChecksum(secret) newCachedCluster := cache.NewCluster( - clusterName, - checksum, - config, - informerFactory, - s.buildKubeClient, + clusterName, checksum, config, + kubeInformerFactory, shipperInformerFactory, + s.buildKubeClient, s.buildShipperClient, func() { // If/when the informer cache finishes syncing, bind all of the event handler // callbacks from the controllers if it does not finish (because the cluster // was Shutdown) this will not be called. for _, cb := range s.eventHandlerRegisterFuncs { - cb(informerFactory, clusterName) + cb(kubeInformerFactory, shipperInformerFactory, clusterName) } }) diff --git a/pkg/clusterclientstore/store_test.go b/pkg/clusterclientstore/store_test.go index 172fd7815..a37556e46 100644 --- a/pkg/clusterclientstore/store_test.go +++ b/pkg/clusterclientstore/store_test.go @@ -37,7 +37,7 @@ type clusters []string type secrets []string func TestClientCreation(t *testing.T) { - clientStoreTestCase(t, "creates config", + clientStoreTestCase(t, "creates application cluster client set", clusters{testClusterName}, secrets{testClusterName}, func(s *Store) (bool, error) { @@ -45,57 +45,11 @@ func TestClientCreation(t *testing.T) { return ok && cluster.IsReady(), nil }, func(s *Store) { - config, err := s.GetConfig(testClusterName) + _, err := s.GetApplicationClusterClientset(testClusterName, AgentName) if err != nil { - t.Errorf("unexpected error getting config %v", err) - } - if config.Host != testClusterHost { - t.Errorf("expected config with host %q but got %q", testClusterHost, config.Host) - } - - if s.cache.Count() != 1 { - t.Errorf("expected exactly one cluster, found %q", s.cache.Count()) - } - }) - - clientStoreTestCase(t, "creates client", - clusters{testClusterName}, - secrets{testClusterName}, - func(s *Store) (bool, error) { - cluster, ok := s.cache.Fetch(testClusterName) - return ok && cluster.IsReady(), nil - }, - func(s *Store) { - ua := "foo" - expected, err := s.GetClient(testClusterName, ua) - if err != nil { - t.Errorf("unexpected error getting client %v", err) - } - if s.cache.Count() != 1 { - t.Errorf("expected exactly one cluster, found %q", s.cache.Count()) + t.Errorf("unexpected error getting clientset: %s", err) } - found, err := s.GetClient(testClusterName, ua) - if err != nil { - t.Errorf("unexpected error getting client %v", err) - } - if found != expected { - t.Errorf("expected client %v to be reused, but instead got a new client %v", expected, found) - } - }) - - clientStoreTestCase(t, "creates informerFactory", - clusters{testClusterName}, - secrets{testClusterName}, - func(s *Store) (bool, error) { - cluster, ok := s.cache.Fetch(testClusterName) - return ok && cluster.IsReady(), nil - }, - func(s *Store) { - _, err := s.GetInformerFactory(testClusterName) - if err != nil { - t.Errorf("unexpected error getting informerFactory %v", err) - } if s.cache.Count() != 1 { t.Errorf("expected exactly one cluster, found %q", s.cache.Count()) } @@ -115,22 +69,9 @@ func TestClientCreation(t *testing.T) { }, func(s *Store) { for _, name := range clusterList { - _, err := s.GetClient(name, "foo") - if err != nil { - t.Errorf("unexpected error getting client %q %v", name, err) - } - - config, err := s.GetConfig(name) + _, err := s.GetApplicationClusterClientset(name, "foo") if err != nil { - t.Errorf("unexpected error getting config for %q %v", name, err) - } - if config.Host != testClusterHost { - t.Errorf("expected config with host %q but got %q", testClusterHost, config.Host) - } - - _, err = s.GetInformerFactory(name) - if err != nil { - t.Errorf("unexpected error getting informerFactory %q %v", name, err) + t.Errorf("unexpected error getting clientset %q: %s", name, err) } } @@ -150,11 +91,11 @@ func TestNoClientGeneration(t *testing.T) { return true, nil }, func(s *Store) { - _, err := s.GetClient("foo", "baz") + _, err := s.GetApplicationClusterClientset("foo", "baz") if !shippererrors.IsClusterNotInStoreError(err) { t.Errorf("expected 'no such cluster' error, but got something else: %v", err) } - _, err = s.GetClient("bar", "baz") + _, err = s.GetApplicationClusterClientset("bar", "baz") if !shippererrors.IsClusterNotInStoreError(err) { t.Errorf("expected 'no such cluster' error, but got something else: %v", err) } @@ -266,36 +207,6 @@ func TestReCacheClusterOnSecretUpdate(t *testing.T) { } } -func TestConfigTimeout(t *testing.T) { - f := newFixture(t) - - sevenSeconds := 7 * time.Second - f.restTimeout = &sevenSeconds - - f.addCluster(testClusterName) - f.addSecret(newValidSecret(testClusterName)) - - store := f.run() - - wait.PollUntil( - 10*time.Millisecond, - func() (bool, error) { - cluster, ok := store.cache.Fetch(testClusterName) - return ok && cluster.IsReady(), nil - }, - stopAfter(3*time.Second), - ) - - restCfg, err := store.GetConfig(testClusterName) - if err != nil { - t.Fatalf("expected a REST config, but got error: %s", err) - } - - if restCfg.Timeout != sevenSeconds { - t.Errorf("expected REST config to have timeout of %s, but got %s", sevenSeconds, restCfg.Timeout) - } -} - type fixture struct { t *testing.T s *Store diff --git a/pkg/controller/capacity/capacity_controller.go b/pkg/controller/capacity/capacity_controller.go index 95f65c0a7..9248b3343 100644 --- a/pkg/controller/capacity/capacity_controller.go +++ b/pkg/controller/capacity/capacity_controller.go @@ -428,7 +428,7 @@ func (c *Controller) enqueueCapacityTarget(obj interface{}) { c.workqueue.Add(key) } -func (c *Controller) registerDeploymentEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) { +func (c *Controller) registerDeploymentEventHandlers(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory informers.SharedInformerFactory, clusterName string) { handler := cache.FilteringResourceEventHandler{ FilterFunc: filters.BelongsToRelease, Handler: cache.ResourceEventHandlerFuncs{ @@ -439,12 +439,12 @@ func (c *Controller) registerDeploymentEventHandlers(informerFactory kubeinforme }, }, } - informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler) + kubeInformerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler) } -func (c *Controller) subscribeToDeployments(informerFactory kubeinformers.SharedInformerFactory) { - informerFactory.Apps().V1().Deployments().Informer() - informerFactory.Core().V1().Pods().Informer() +func (c *Controller) subscribeToDeployments(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory informers.SharedInformerFactory) { + kubeInformerFactory.Apps().V1().Deployments().Informer() + kubeInformerFactory.Core().V1().Pods().Informer() } func (c Controller) getClusterObjects(clusterName, ns, appName, release string) (*appsv1.Deployment, []*corev1.Pod, error) { diff --git a/pkg/controller/installation/installation_controller.go b/pkg/controller/installation/installation_controller.go index d8614cce2..e40cb7053 100644 --- a/pkg/controller/installation/installation_controller.go +++ b/pkg/controller/installation/installation_controller.go @@ -125,7 +125,7 @@ func NewController( return controller } -func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) { +func (c *Controller) registerAppClusterEventHandlers(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory shipperinformers.SharedInformerFactory, clusterName string) { handler := cache.FilteringResourceEventHandler{ FilterFunc: filters.BelongsToRelease, Handler: cache.ResourceEventHandlerFuncs{ @@ -136,13 +136,13 @@ func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinforme }, }, } - informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler) - informerFactory.Core().V1().Services().Informer().AddEventHandler(handler) + kubeInformerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler) + kubeInformerFactory.Core().V1().Services().Informer().AddEventHandler(handler) } -func (c *Controller) subscribeToAppClusterEvents(informerFactory kubeinformers.SharedInformerFactory) { - informerFactory.Apps().V1().Deployments().Informer() - informerFactory.Core().V1().Services().Informer() +func (c *Controller) subscribeToAppClusterEvents(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory shipperinformers.SharedInformerFactory) { + kubeInformerFactory.Apps().V1().Deployments().Informer() + kubeInformerFactory.Core().V1().Services().Informer() } func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) { diff --git a/pkg/controller/janitor/janitor_controller.go b/pkg/controller/janitor/janitor_controller.go index f1f98ce37..90aa888d2 100644 --- a/pkg/controller/janitor/janitor_controller.go +++ b/pkg/controller/janitor/janitor_controller.go @@ -93,7 +93,7 @@ func NewController( return controller } -func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) { +func (c *Controller) registerAppClusterEventHandlers(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory shipperinformers.SharedInformerFactory, clusterName string) { handler := cache.FilteringResourceEventHandler{ FilterFunc: filters.BelongsToInstallationTarget, Handler: cache.ResourceEventHandlerFuncs{ @@ -103,11 +103,11 @@ func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinforme }, } - informerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(handler) + kubeInformerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(handler) } -func (c *Controller) subscribeToAppClusterEvents(informerFactory kubeinformers.SharedInformerFactory) { - informerFactory.Core().V1().ConfigMaps().Informer() +func (c *Controller) subscribeToAppClusterEvents(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory shipperinformers.SharedInformerFactory) { + kubeInformerFactory.Core().V1().ConfigMaps().Informer() } type WorkItem interface { diff --git a/pkg/controller/traffic/traffic_controller.go b/pkg/controller/traffic/traffic_controller.go index dea6c899e..c9421c955 100644 --- a/pkg/controller/traffic/traffic_controller.go +++ b/pkg/controller/traffic/traffic_controller.go @@ -101,8 +101,8 @@ func NewController( // Endpoints object anyway. In case a new or deleted pod does change traffic // shifting in any way, the update to the traffic target itself will trigger a // new evaluation of all traffic targets for an app. -func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) { - informerFactory.Core().V1().Endpoints().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ +func (c *Controller) registerAppClusterEventHandlers(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory informers.SharedInformerFactory, clusterName string) { + kubeInformerFactory.Core().V1().Endpoints().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: filters.BelongsToApp, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueAllTrafficTargets, @@ -113,7 +113,7 @@ func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinforme }, }) - informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + kubeInformerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: filters.BelongsToRelease, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueTrafficTargetFromPod, @@ -122,10 +122,10 @@ func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinforme }) } -func (c *Controller) subscribeToAppClusterEvents(informerFactory kubeinformers.SharedInformerFactory) { - informerFactory.Core().V1().Pods().Informer() - informerFactory.Core().V1().Services().Informer() - informerFactory.Core().V1().Endpoints().Informer() +func (c *Controller) subscribeToAppClusterEvents(kubeInformerFactory kubeinformers.SharedInformerFactory, shipperInformerFactory informers.SharedInformerFactory) { + kubeInformerFactory.Core().V1().Pods().Informer() + kubeInformerFactory.Core().V1().Services().Informer() + kubeInformerFactory.Core().V1().Endpoints().Informer() } // Run will set up the event handlers for types we are interested in, as well as diff --git a/pkg/testing/fakeclusterclientstore.go b/pkg/testing/fakeclusterclientstore.go index 8373749df..5603bc0dd 100644 --- a/pkg/testing/fakeclusterclientstore.go +++ b/pkg/testing/fakeclusterclientstore.go @@ -40,19 +40,20 @@ func (s *FakeClusterClientStore) AddEventHandlerCallback(c clusterclientstore.Ev } func (s *FakeClusterClientStore) Run(stopCh <-chan struct{}) { - for name, cluster := range s.clusters { - informerFactory := cluster.KubeInformerFactory - + for _, cluster := range s.clusters { for _, subscriptionCallback := range s.subscriptionCallbacks { - subscriptionCallback(informerFactory) + subscriptionCallback(cluster.KubeInformerFactory, cluster.ShipperInformerFactory) } for _, eventHandlerCallback := range s.eventHandlerCallbacks { - eventHandlerCallback(informerFactory, name) + eventHandlerCallback(cluster.KubeInformerFactory, cluster.ShipperInformerFactory, cluster.Name) } - informerFactory.Start(stopCh) - informerFactory.WaitForCacheSync(stopCh) + cluster.KubeInformerFactory.Start(stopCh) + cluster.KubeInformerFactory.WaitForCacheSync(stopCh) + + cluster.ShipperInformerFactory.Start(stopCh) + cluster.ShipperInformerFactory.WaitForCacheSync(stopCh) } }