From 1c906a25edee862e8e7554e47c404b50566c16a0 Mon Sep 17 00:00:00 2001 From: Shintaro Murakami Date: Mon, 27 May 2019 18:33:38 +0900 Subject: [PATCH] Create stores and reflectors with a simpler way --- .../utils/kubernetes/listers.go | 34 +++++++------------ 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index b9c67cf754ae..5cce3e55e563 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -185,10 +185,9 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) podLister := v1lister.NewPodLister(store) - podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour) - go podReflector.Run(stopchannel) + go reflector.Run(stopchannel) return &UnschedulablePodLister{ podLister: podLister, } @@ -210,10 +209,9 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) podLister := v1lister.NewPodLister(store) - podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour) - go podReflector.Run(stopchannel) + go reflector.Run(stopchannel) return &ScheduledPodLister{ podLister: podLister, @@ -245,9 +243,8 @@ func NewAllNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) // NewNodeLister builds a node lister. func NewNodeLister(kubeClient client.Interface, filter func(*apiv1.Node) bool, stopChannel <-chan struct{}) NodeLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.Node{}, time.Hour) nodeLister := v1lister.NewNodeLister(store) - reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, store, time.Hour) go reflector.Run(stopChannel) return &nodeListerImpl{ nodeLister: nodeLister, @@ -301,9 +298,8 @@ func (lister *PodDisruptionBudgetListerImpl) List() ([]*policyv1.PodDisruptionBu // NewPodDisruptionBudgetLister builds a pod disruption budget lister. func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodDisruptionBudgetLister { listWatcher := cache.NewListWatchFromClient(kubeClient.PolicyV1beta1().RESTClient(), "poddisruptionbudgets", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &policyv1.PodDisruptionBudget{}, time.Hour) pdbLister := v1policylister.NewPodDisruptionBudgetLister(store) - reflector := cache.NewReflector(listWatcher, &policyv1.PodDisruptionBudget{}, store, time.Hour) go reflector.Run(stopchannel) return &PodDisruptionBudgetListerImpl{ pdbLister: pdbLister, @@ -313,9 +309,8 @@ func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-cha // NewDaemonSetLister builds a daemonset lister. func NewDaemonSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.DaemonSetLister { listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "daemonsets", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.DaemonSet{}, time.Hour) lister := v1appslister.NewDaemonSetLister(store) - reflector := cache.NewReflector(listWatcher, &appsv1.DaemonSet{}, store, time.Hour) go reflector.Run(stopchannel) return lister } @@ -323,9 +318,8 @@ func NewDaemonSetLister(kubeClient client.Interface, stopchannel <-chan struct{} // NewReplicationControllerLister builds a replicationcontroller lister. func NewReplicationControllerLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.ReplicationControllerLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ReplicationController{}, time.Hour) lister := v1lister.NewReplicationControllerLister(store) - reflector := cache.NewReflector(listWatcher, &apiv1.ReplicationController{}, store, time.Hour) go reflector.Run(stopchannel) return lister } @@ -333,9 +327,8 @@ func NewReplicationControllerLister(kubeClient client.Interface, stopchannel <-c // NewJobLister builds a job lister. func NewJobLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1batchlister.JobLister { listWatcher := cache.NewListWatchFromClient(kubeClient.BatchV1().RESTClient(), "jobs", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &batchv1.Job{}, time.Hour) lister := v1batchlister.NewJobLister(store) - reflector := cache.NewReflector(listWatcher, &batchv1.Job{}, store, time.Hour) go reflector.Run(stopchannel) return lister } @@ -343,9 +336,8 @@ func NewJobLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1ba // NewReplicaSetLister builds a replicaset lister. func NewReplicaSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.ReplicaSetLister { listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "replicasets", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.ReplicaSet{}, time.Hour) lister := v1appslister.NewReplicaSetLister(store) - reflector := cache.NewReflector(listWatcher, &appsv1.ReplicaSet{}, store, time.Hour) go reflector.Run(stopchannel) return lister } @@ -353,9 +345,8 @@ func NewReplicaSetLister(kubeClient client.Interface, stopchannel <-chan struct{ // NewStatefulSetLister builds a statefulset lister. func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.StatefulSetLister { listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "statefulsets", apiv1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.StatefulSet{}, time.Hour) lister := v1appslister.NewStatefulSetLister(store) - reflector := cache.NewReflector(listWatcher, &appsv1.StatefulSet{}, store, time.Hour) go reflector.Run(stopchannel) return lister } @@ -364,9 +355,8 @@ func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-chan struct{}, namespace string) v1lister.ConfigMapLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ConfigMap{}, time.Hour) lister := v1lister.NewConfigMapLister(store) - reflector := cache.NewReflector(listWatcher, &apiv1.ConfigMap{}, store, time.Hour) go reflector.Run(stopchannel) return lister }