Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
clusterclientstore: add shipper's informer factory to callbacks
Browse files Browse the repository at this point in the history
As we're gonna start talking to Shipper objects in the application
clusters, we also need to install event listeners for them. This changes
the interfaces to also take a shipperinformers.SharedInformerFactory
together with the kubeinformers.SharedInformerFactory.
  • Loading branch information
juliogreff authored and Oleg Sidorov committed Apr 2, 2020
1 parent c1a8d91 commit 757b0ef
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 232 deletions.
121 changes: 89 additions & 32 deletions pkg/clusterclientstore/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package cache
import (
"sync"

shippererrors "github.com/bookingcom/shipper/pkg/errors"
kubeinformers "k8s.io/client-go/informers"
kubernetes "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

shipperclientset "github.com/bookingcom/shipper/pkg/client/clientset/versioned"
shipperinformers "github.com/bookingcom/shipper/pkg/client/informers/externalversions"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
)

const (
Expand All @@ -20,38 +23,52 @@ func NewCluster(
name,
checksum string,
config *rest.Config,
informerFactory kubeinformers.SharedInformerFactory,
buildClient func(string, string, *rest.Config) (kubernetes.Interface, error),
kubeInformerFactory kubeinformers.SharedInformerFactory,
shipperInformerFactory shipperinformers.SharedInformerFactory,
buildKubeClient func(string, string, *rest.Config) (kubernetes.Interface, error),
buildShipperClient func(string, string, *rest.Config) (shipperclientset.Interface, error),
cacheSyncCb func(),
) *Cluster {
return &Cluster{
state: StateNotReady,
name: name,
checksum: checksum,
config: config,
informerFactory: informerFactory,
buildClient: buildClient,
cacheSyncCb: cacheSyncCb,
stopCh: make(chan struct{}),
name: name,
checksum: checksum,
config: config,

state: StateNotReady,

kubeClients: make(map[string]kubernetes.Interface),
shipperClients: make(map[string]shipperclientset.Interface),

clients: make(map[string]kubernetes.Interface),
kubeInformerFactory: kubeInformerFactory,
buildKubeClient: buildKubeClient,

shipperInformerFactory: shipperInformerFactory,
buildShipperClient: buildShipperClient,

cacheSyncCb: cacheSyncCb,

stopCh: make(chan struct{}),
}
}

type Cluster struct {
name string
// These are all read-only after initialization, so no lock needed.
name string
checksum string
config *rest.Config

stateMut sync.RWMutex
state string

clientsMut sync.Mutex
clients map[string]kubernetes.Interface
clientsMut sync.Mutex
kubeClients map[string]kubernetes.Interface
shipperClients map[string]shipperclientset.Interface

// These are all read-only after initialization, so no lock needed.
checksum string
config *rest.Config
informerFactory kubeinformers.SharedInformerFactory
buildClient func(string, string, *rest.Config) (kubernetes.Interface, error)
kubeInformerFactory kubeinformers.SharedInformerFactory
buildKubeClient func(string, string, *rest.Config) (kubernetes.Interface, error)

shipperInformerFactory shipperinformers.SharedInformerFactory
buildShipperClient func(string, string, *rest.Config) (shipperclientset.Interface, error)

cacheSyncCb func()

Expand All @@ -64,27 +81,52 @@ func (c *Cluster) IsReady() bool {
return c.state == StateReady
}

// GetClient returns a client for the user agent specified by ua. If a client
// doesn't exist for that user agent, one will be created by calling the
// buildClient func.
func (c *Cluster) GetClient(ua string) (kubernetes.Interface, error) {
// GetKubeClient returns a kubernetes client for the user agent specified by
// ua. If a client doesn't exist for that user agent, one will be created by
// calling the buildKubeClient func.
func (c *Cluster) GetKubeClient(ua string) (kubernetes.Interface, error) {
if !c.IsReady() {
return nil, shippererrors.NewClusterNotReadyError(c.name)
}

c.clientsMut.Lock()
defer c.clientsMut.Unlock()

if client, ok := c.clients[ua]; ok {
if client, ok := c.kubeClients[ua]; ok {
return client, nil
}

client, err := c.buildClient(c.name, ua, c.config)
client, err := c.buildKubeClient(c.name, ua, c.config)
if err != nil {
return nil, err
}

c.clients[ua] = client
c.kubeClients[ua] = client

return client, nil
}

// GetShipperClient returns a shipperrnetes client for the user agent specified by
// ua. If a client doesn't exist for that user agent, one will be created by
// calling the buildShipperClient func.
func (c *Cluster) GetShipperClient(ua string) (shipperclientset.Interface, error) {
if !c.IsReady() {
return nil, shippererrors.NewClusterNotReadyError(c.name)
}

c.clientsMut.Lock()
defer c.clientsMut.Unlock()

if client, ok := c.shipperClients[ua]; ok {
return client, nil
}

client, err := c.buildShipperClient(c.name, ua, c.config)
if err != nil {
return nil, err
}

c.shipperClients[ua] = client

return client, nil
}
Expand All @@ -105,12 +147,20 @@ func (c *Cluster) GetChecksum() (string, error) {
return c.checksum, nil
}

func (c *Cluster) GetInformerFactory() (kubeinformers.SharedInformerFactory, error) {
func (c *Cluster) GetKubeInformerFactory() (kubeinformers.SharedInformerFactory, error) {
if !c.IsReady() {
return nil, shippererrors.NewClusterNotReadyError(c.name)
}

return c.informerFactory, nil
return c.kubeInformerFactory, nil
}

func (c *Cluster) GetShipperInformerFactory() (shipperinformers.SharedInformerFactory, error) {
if !c.IsReady() {
return nil, shippererrors.NewClusterNotReadyError(c.name)
}

return c.shipperInformerFactory, nil
}

// This will block until the cache syncs. If the cache is never going to sync
Expand All @@ -131,10 +181,17 @@ func (c *Cluster) WaitForInformerCache() {
c.state = StateWaitingForSync
c.stateMut.Unlock()

c.informerFactory.Start(c.stopCh)
c.kubeInformerFactory.Start(c.stopCh)
c.shipperInformerFactory.Start(c.stopCh)

ok := true
syncedInformers := c.informerFactory.WaitForCacheSync(c.stopCh)
for _, synced := range syncedInformers {
syncedKubeInformers := c.kubeInformerFactory.WaitForCacheSync(c.stopCh)
for _, synced := range syncedKubeInformers {
ok = ok && synced
}

syncedShipperInformers := c.shipperInformerFactory.WaitForCacheSync(c.stopCh)
for _, synced := range syncedShipperInformers {
ok = ok && synced
}

Expand Down
32 changes: 26 additions & 6 deletions pkg/clusterclientstore/cache/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"testing"
"time"

shippererrors "github.com/bookingcom/shipper/pkg/errors"
kubeinformers "k8s.io/client-go/informers"
kubernetes "k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"

shipperclientset "github.com/bookingcom/shipper/pkg/client/clientset/versioned"
shipperfake "github.com/bookingcom/shipper/pkg/client/clientset/versioned/fake"
shipperinformers "github.com/bookingcom/shipper/pkg/client/informers/externalversions"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
)

const (
Expand Down Expand Up @@ -131,34 +135,50 @@ func TestReplacement(t *testing.T) {
t.Errorf("expected GetChecksum on replaced cluster to return ClusterNotReady, got %v", err)
}

_, err = existing.GetClient("foo")
_, err = existing.GetKubeClient("foo")
if !shippererrors.IsClusterNotReadyError(err) {
t.Errorf("expected GetKubeClient on replaced cluster to return ClusterNotReady, got %v", err)
}

_, err = existing.GetShipperClient("foo")
if !shippererrors.IsClusterNotReadyError(err) {
t.Errorf("expected GetClient on replaced cluster to return ClusterNotReady, got %v", err)
t.Errorf("expected GetShipperClient on replaced cluster to return ClusterNotReady, got %v", err)
}

_, err = existing.GetConfig()
if !shippererrors.IsClusterNotReadyError(err) {
t.Errorf("expected GetConfig on replaced cluster to return ClusterNotReady, got %v", err)
}

_, err = existing.GetInformerFactory()
_, err = existing.GetKubeInformerFactory()
if !shippererrors.IsClusterNotReadyError(err) {
t.Errorf("expected GetInformerFactory on replaced cluster to return ClusterNotReady, got %v", err)
t.Errorf("expected GetKubeInformerFactory on replaced cluster to return ClusterNotReady, got %v", err)
}

_, err = existing.GetShipperInformerFactory()
if !shippererrors.IsClusterNotReadyError(err) {
t.Errorf("expected GetShipperInformerFactory on replaced cluster to return ClusterNotReady, got %v", err)
}
}

func newCluster(name string) *Cluster {
kubeClient := kubefake.NewSimpleClientset()
shipperClient := shipperfake.NewSimpleClientset()

const noResyncPeriod time.Duration = 0
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, noResyncPeriod)
shipperInformerFactory := shipperinformers.NewSharedInformerFactory(shipperClient, noResyncPeriod)
config := &rest.Config{}

return NewCluster(
name, testChecksum, config, kubeInformerFactory,
name, testChecksum, config,
kubeInformerFactory, shipperInformerFactory,
func(cluster, ua string, config *rest.Config) (kubernetes.Interface, error) {
return nil, nil
},
func(cluster, ua string, config *rest.Config) (shipperclientset.Interface, error) {
return nil, nil
},
func() {},
)
}
18 changes: 0 additions & 18 deletions pkg/clusterclientstore/handlers.go

This file was deleted.

18 changes: 18 additions & 0 deletions pkg/clusterclientstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,21 @@ type ClientsetInterface interface {
GetShipperClient() shipperclientset.Interface
GetShipperInformerFactory() shipperinformers.SharedInformerFactory
}

// SubscriptionRegisterFunc should call the relevant functions on a shared
// informer factory to set up watches.
//
// Note that there should be no event handlers being assigned to any informers
// in this function.
type SubscriptionRegisterFunc func(kubeinformers.SharedInformerFactory, shipperinformers.SharedInformerFactory)

// EventHandlerRegisterFunc is called after the caches for the clusters have
// been built, and provides a hook for a controller to register its event
// handlers. These will be event handlers for changes to the resources that the
// controller has subscribed to in the `SubscriptionRegisterFunc` callback.
type EventHandlerRegisterFunc func(kubeinformers.SharedInformerFactory, shipperinformers.SharedInformerFactory, string)

// This enables tests to inject an appropriate fake client, which allows us to
// use the real cluster client store in unit tests.
type KubeClientBuilderFunc func(string, string, *rest.Config) (kubernetes.Interface, error)
type ShipperClientBuilderFunc func(string, string, *rest.Config) (shipperclientset.Interface, error)
Loading

0 comments on commit 757b0ef

Please sign in to comment.