Skip to content

Commit

Permalink
Merge pull request #2062 from mrkm4ntr/namespace-keyed-indexer-and-re…
Browse files Browse the repository at this point in the history
…flector

Create stores and reflectors with a simpler way
  • Loading branch information
k8s-ci-robot authored May 30, 2019
2 parents 87720ab + 1c906a2 commit 3f57084
Showing 1 changed file with 12 additions and 22 deletions.
34 changes: 12 additions & 22 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
go podReflector.Run(stopchannel)
go reflector.Run(stopchannel)
return &UnschedulablePodLister{
podLister: podLister,
}
Expand All @@ -210,10 +209,9 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
go podReflector.Run(stopchannel)
go reflector.Run(stopchannel)

return &ScheduledPodLister{
podLister: podLister,
Expand Down Expand Up @@ -245,9 +243,8 @@ func NewAllNodeLister(kubeClient client.Interface, stopChannel <-chan struct{})
// NewNodeLister builds a node lister.
func NewNodeLister(kubeClient client.Interface, filter func(*apiv1.Node) bool, stopChannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.Node{}, time.Hour)
nodeLister := v1lister.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, store, time.Hour)
go reflector.Run(stopChannel)
return &nodeListerImpl{
nodeLister: nodeLister,
Expand Down Expand Up @@ -301,9 +298,8 @@ func (lister *PodDisruptionBudgetListerImpl) List() ([]*policyv1.PodDisruptionBu
// NewPodDisruptionBudgetLister builds a pod disruption budget lister.
func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodDisruptionBudgetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.PolicyV1beta1().RESTClient(), "poddisruptionbudgets", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &policyv1.PodDisruptionBudget{}, time.Hour)
pdbLister := v1policylister.NewPodDisruptionBudgetLister(store)
reflector := cache.NewReflector(listWatcher, &policyv1.PodDisruptionBudget{}, store, time.Hour)
go reflector.Run(stopchannel)
return &PodDisruptionBudgetListerImpl{
pdbLister: pdbLister,
Expand All @@ -313,49 +309,44 @@ func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-cha
// NewDaemonSetLister builds a daemonset lister.
func NewDaemonSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.DaemonSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "daemonsets", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.DaemonSet{}, time.Hour)
lister := v1appslister.NewDaemonSetLister(store)
reflector := cache.NewReflector(listWatcher, &appsv1.DaemonSet{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}

// NewReplicationControllerLister builds a replicationcontroller lister.
func NewReplicationControllerLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.ReplicationControllerLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ReplicationController{}, time.Hour)
lister := v1lister.NewReplicationControllerLister(store)
reflector := cache.NewReflector(listWatcher, &apiv1.ReplicationController{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}

// NewJobLister builds a job lister.
func NewJobLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1batchlister.JobLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.BatchV1().RESTClient(), "jobs", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &batchv1.Job{}, time.Hour)
lister := v1batchlister.NewJobLister(store)
reflector := cache.NewReflector(listWatcher, &batchv1.Job{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}

// NewReplicaSetLister builds a replicaset lister.
func NewReplicaSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.ReplicaSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "replicasets", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.ReplicaSet{}, time.Hour)
lister := v1appslister.NewReplicaSetLister(store)
reflector := cache.NewReflector(listWatcher, &appsv1.ReplicaSet{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}

// NewStatefulSetLister builds a statefulset lister.
func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.StatefulSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "statefulsets", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.StatefulSet{}, time.Hour)
lister := v1appslister.NewStatefulSetLister(store)
reflector := cache.NewReflector(listWatcher, &appsv1.StatefulSet{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}
Expand All @@ -364,9 +355,8 @@ func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct
func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-chan struct{},
namespace string) v1lister.ConfigMapLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ConfigMap{}, time.Hour)
lister := v1lister.NewConfigMapLister(store)
reflector := cache.NewReflector(listWatcher, &apiv1.ConfigMap{}, store, time.Hour)
go reflector.Run(stopchannel)
return lister
}

0 comments on commit 3f57084

Please sign in to comment.