From 026d2c20fd9d66819eaca24bbd671803078dd489 Mon Sep 17 00:00:00 2001 From: Beata Skiba Date: Wed, 22 Aug 2018 14:33:39 +0200 Subject: [PATCH] Handle channel closes from pod event watcher --- .../pkg/recommender/input/cluster_feeder.go | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 57f9601f545e..7e3262293035 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -71,7 +71,7 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState kubeClient := kube_client.NewForConfigOrDie(config) oomObserver := oom.NewObserver() podLister := newPodClients(kubeClient, &oomObserver) - watchEvictionEvents(kubeClient, &oomObserver) + watchEvictionEventsWithRetries(kubeClient, &oomObserver) return &clusterStateFeeder{ coreClient: kubeClient.CoreV1(), specClient: spec.NewSpecClient(podLister), @@ -88,29 +88,40 @@ func newMetricsClient(config *rest.Config) metrics.MetricsClient { return metrics.NewMetricsClient(metricsGetter) } -func watchEvictionEvents(kubeClient kube_client.Interface, observer *oom.Observer) { - options := metav1.ListOptions{ - FieldSelector: "reason=Evicted", - } - watchInterface, err := kubeClient.CoreV1().Events("").Watch(options) - if err != nil { - glog.Errorf("Cannot initialize watching events. Reason %v", err) - return - } +func watchEvictionEventsWithRetries(kubeClient kube_client.Interface, observer *oom.Observer) { go func() { + options := metav1.ListOptions{ + FieldSelector: "reason=Evicted", + } + for { - result := <-watchInterface.ResultChan() - if result.Type == watch.Added { - result, ok := result.Object.(*apiv1.Event) - if !ok { - continue - } - observer.OnEvent(result) + watchInterface, err := kubeClient.CoreV1().Events("").Watch(options) + if err != nil { + glog.Errorf("Cannot initialize watching events. Reason %v", err) + continue } + watchEvictionEvents(watchInterface.ResultChan(), observer) } }() } +func watchEvictionEvents(evictedEventChan <-chan watch.Event, observer *oom.Observer) { + for { + evictedEvent, ok := <-evictedEventChan + if !ok { + glog.V(3).Infof("Eviction event chan closed") + return + } + if evictedEvent.Type == watch.Added { + evictedEvent, ok := evictedEvent.Object.(*apiv1.Event) + if !ok { + continue + } + observer.OnEvent(evictedEvent) + } + } +} + // Creates clients watching pods: PodLister (listing only not terminated pods). func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler) v1lister.PodLister { selector := fields.ParseSelectorOrDie("status.phase!=" + string(apiv1.PodPending))