Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SharedIndexInformers in place of Informers #2271

Merged
merged 1 commit into from
Mar 29, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 65 additions & 56 deletions internal/ingress/controller/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import (

apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"k8s.io/ingress-nginx/internal/file"
Expand Down Expand Up @@ -120,6 +120,15 @@ type Event struct {
Obj interface{}
}

// Informer defines the required SharedIndexInformers that interact with the API server.
type Informer struct {
Ingress cache.SharedIndexInformer
Endpoint cache.SharedIndexInformer
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
}

// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
type Lister struct {
Ingress IngressLister
Expand All @@ -130,38 +139,34 @@ type Lister struct {
IngressAnnotation IngressAnnotationsLister
}

// Controller defines the required controllers that interact against the api server
type Controller struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)

// Run initiates the synchronization of the controllers against the api server
func (c *Controller) Run(stopCh chan struct{}) {
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
i.Endpoint.HasSynced,
i.Service.HasSynced,
i.Secret.HasSynced,
i.ConfigMap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}

// We need to wait before start syncing the ingress rules
// because the rules requires content from other listers
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)

// we can start syncing ingress objects only after other caches are
// ready, because ingress rules require content from other listers, and
// 'add' events get triggered in the handlers during caches population.
go i.Ingress.Run(stopCh)
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
i.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
Expand All @@ -176,10 +181,10 @@ type k8sStore struct {
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration

// cache contains the cache Controllers
cache *Controller
// informer contains the cache Informers
informers *Informer

// listers contains the cache.Store used in the ingress controller
// listers contains the cache.Store interfaces used in the ingress controller
listers *Lister

// sslStore local store of SSL certificates (certificates used in ingress)
Expand Down Expand Up @@ -214,7 +219,7 @@ func New(checkOCSP bool,

store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
cache: &Controller{},
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
filesystem: fs,
Expand All @@ -237,6 +242,26 @@ func New(checkOCSP bool,
// k8sStore fulfils resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)

store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})

store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()

store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()

store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()

ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
Expand Down Expand Up @@ -372,7 +397,7 @@ func New(checkOCSP bool,
},
}

eventHandler := cache.ResourceEventHandlerFuncs{
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Expand Down Expand Up @@ -434,27 +459,11 @@ func New(checkOCSP bool,
},
}

store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)

store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, ingEventHandler)

store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
&apiv1.Endpoints{}, resyncPeriod, eventHandler)

store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
&apiv1.Secret{}, resyncPeriod, secrEventHandler)

store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)

store.listers.Service.Store, store.cache.Service = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(mapEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})

return store
}
Expand Down Expand Up @@ -611,11 +620,11 @@ func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
}
}

// Run initiates the synchronization of the controllers
// and the initial synchronization of the secrets.
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start controllers
s.cache.Run(stopCh)
// start informers
s.informers.Run(stopCh)

// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
Expand Down