Skip to content

Commit

Permalink
Merge pull request #1179 from bskiba/watch-channel
Browse files Browse the repository at this point in the history
Handle channel closes from pod event watcher
  • Loading branch information
bskiba authored Aug 22, 2018
2 parents 09590de + 026d2c2 commit 6e9f520
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState
kubeClient := kube_client.NewForConfigOrDie(config)
oomObserver := oom.NewObserver()
podLister := newPodClients(kubeClient, &oomObserver)
watchEvictionEvents(kubeClient, &oomObserver)
watchEvictionEventsWithRetries(kubeClient, &oomObserver)
return &clusterStateFeeder{
coreClient: kubeClient.CoreV1(),
specClient: spec.NewSpecClient(podLister),
Expand All @@ -88,29 +88,40 @@ func newMetricsClient(config *rest.Config) metrics.MetricsClient {
return metrics.NewMetricsClient(metricsGetter)
}

func watchEvictionEvents(kubeClient kube_client.Interface, observer *oom.Observer) {
options := metav1.ListOptions{
FieldSelector: "reason=Evicted",
}
watchInterface, err := kubeClient.CoreV1().Events("").Watch(options)
if err != nil {
glog.Errorf("Cannot initialize watching events. Reason %v", err)
return
}
func watchEvictionEventsWithRetries(kubeClient kube_client.Interface, observer *oom.Observer) {
go func() {
options := metav1.ListOptions{
FieldSelector: "reason=Evicted",
}

for {
result := <-watchInterface.ResultChan()
if result.Type == watch.Added {
result, ok := result.Object.(*apiv1.Event)
if !ok {
continue
}
observer.OnEvent(result)
watchInterface, err := kubeClient.CoreV1().Events("").Watch(options)
if err != nil {
glog.Errorf("Cannot initialize watching events. Reason %v", err)
continue
}
watchEvictionEvents(watchInterface.ResultChan(), observer)
}
}()
}

func watchEvictionEvents(evictedEventChan <-chan watch.Event, observer *oom.Observer) {
for {
evictedEvent, ok := <-evictedEventChan
if !ok {
glog.V(3).Infof("Eviction event chan closed")
return
}
if evictedEvent.Type == watch.Added {
evictedEvent, ok := evictedEvent.Object.(*apiv1.Event)
if !ok {
continue
}
observer.OnEvent(evictedEvent)
}
}
}

// Creates clients watching pods: PodLister (listing only not terminated pods).
func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler) v1lister.PodLister {
selector := fields.ParseSelectorOrDie("status.phase!=" + string(apiv1.PodPending))
Expand Down

0 comments on commit 6e9f520

Please sign in to comment.