From cdc15577046a265d9ba6404bc026bade24cd8da3 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Thu, 5 Apr 2018 09:32:09 +0200 Subject: [PATCH 1/2] Adding panic-safe ResourceWatcher --- pkg/util/k8sutil/informer.go | 90 ++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 pkg/util/k8sutil/informer.go diff --git a/pkg/util/k8sutil/informer.go b/pkg/util/k8sutil/informer.go new file mode 100644 index 000000000..6e59b1ef8 --- /dev/null +++ b/pkg/util/k8sutil/informer.go @@ -0,0 +1,90 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package k8sutil + +import ( + "github.com/rs/zerolog" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +// ResourceWatcher is a helper to watch for events in a specific type +// of resource. The handler functions are protected from panics. +type ResourceWatcher struct { + informer cache.Controller +} + +// NewResourceWatcher creates a helper that watches for changes in a resource of a specific type. +// If wraps the given handler functions, such that panics are caught and logged. +func NewResourceWatcher(log zerolog.Logger, getter cache.Getter, resource, namespace string, + objType runtime.Object, h cache.ResourceEventHandlerFuncs) *ResourceWatcher { + source := cache.NewListWatchFromClient( + getter, + resource, + namespace, + fields.Everything()) + + _, informer := cache.NewIndexerInformer(source, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer func() { + if err := recover(); err != nil { + log.Error().Interface("error", err).Msg("Recovered from panic") + } + }() + if h.AddFunc != nil { + h.AddFunc(obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + defer func() { + if err := recover(); err != nil { + log.Error().Interface("error", err).Msg("Recovered from panic") + } + }() + if h.UpdateFunc != nil { + h.UpdateFunc(oldObj, newObj) + } + }, + DeleteFunc: func(obj interface{}) { + defer func() { + if err := recover(); err != nil { + log.Error().Interface("error", err).Msg("Recovered from panic") + } + }() + if h.DeleteFunc != nil { + h.DeleteFunc(obj) + } + }, + }, cache.Indexers{}) + + return &ResourceWatcher{ + informer: informer, + } +} + +// Run continues to watch for events on the selected type of resource +// until the given channel is closed. +func (rw *ResourceWatcher) Run(stopCh <-chan struct{}) { + rw.informer.Run(stopCh) +} From c08dc0b342e7ff95cc2bdc8ec2b1ad7195ffac83 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Thu, 5 Apr 2018 09:53:54 +0200 Subject: [PATCH 2/2] Using panic safe resource watcher --- pkg/deployment/informers.go | 205 +++++++++++++------------ pkg/operator/operator_deployment.go | 20 +-- pkg/operator/operator_local_storage.go | 20 +-- pkg/storage/pv_informer.go | 49 +++--- pkg/storage/pvc_informer.go | 59 +++---- 5 files changed, 178 insertions(+), 175 deletions(-) diff --git a/pkg/deployment/informers.go b/pkg/deployment/informers.go index 5cf7da2a2..da4ddcc6d 100644 --- a/pkg/deployment/informers.go +++ b/pkg/deployment/informers.go @@ -24,18 +24,13 @@ package deployment import ( "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/tools/cache" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) // listenForPodEvents keep listening for changes in pod until the given channel is closed. func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) { - source := cache.NewListWatchFromClient( - d.deps.KubeCli.CoreV1().RESTClient(), - "pods", - d.apiObject.GetNamespace(), - fields.Everything()) - getPod := func(obj interface{}) (*v1.Pod, bool) { pod, ok := obj.(*v1.Pod) if !ok { @@ -49,35 +44,35 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) { return pod, true } - _, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if p, ok := getPod(obj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if p, ok := getPod(newObj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - DeleteFunc: func(obj interface{}) { - if p, ok := getPod(obj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - }, cache.Indexers{}) - - informer.Run(stopCh) + rw := k8sutil.NewResourceWatcher( + d.deps.Log, + d.deps.KubeCli.CoreV1().RESTClient(), + "pods", + d.apiObject.GetNamespace(), + &v1.Pod{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if p, ok := getPod(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if p, ok := getPod(newObj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if p, ok := getPod(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + }) + + rw.Run(stopCh) } // listenForPVCEvents keep listening for changes in PVC's until the given channel is closed. func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) { - source := cache.NewListWatchFromClient( - d.deps.KubeCli.CoreV1().RESTClient(), - "persistentvolumeclaims", - d.apiObject.GetNamespace(), - fields.Everything()) - getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { @@ -91,35 +86,35 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) { return pvc, true } - _, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - DeleteFunc: func(obj interface{}) { - if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { - d.triggerInspection() - } - }, - }, cache.Indexers{}) - - informer.Run(stopCh) + rw := k8sutil.NewResourceWatcher( + d.deps.Log, + d.deps.KubeCli.CoreV1().RESTClient(), + "persistentvolumeclaims", + d.apiObject.GetNamespace(), + &v1.PersistentVolumeClaim{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + }) + + rw.Run(stopCh) } // listenForSecretEvents keep listening for changes in Secrets's until the given channel is closed. func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) { - source := cache.NewListWatchFromClient( - d.deps.KubeCli.CoreV1().RESTClient(), - "secrets", - d.apiObject.GetNamespace(), - fields.Everything()) - getSecret := func(obj interface{}) (*v1.Secret, bool) { secret, ok := obj.(*v1.Secret) if !ok { @@ -133,36 +128,36 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) { return secret, true } - _, informer := cache.NewIndexerInformer(source, &v1.Secret{}, 0, cache.ResourceEventHandlerFuncs{ - // Note: For secrets we look at all of them because they do not have to be owned by this deployment. - AddFunc: func(obj interface{}) { - if _, ok := getSecret(obj); ok { - d.triggerInspection() - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if _, ok := getSecret(newObj); ok { - d.triggerInspection() - } - }, - DeleteFunc: func(obj interface{}) { - if _, ok := getSecret(obj); ok { - d.triggerInspection() - } - }, - }, cache.Indexers{}) - - informer.Run(stopCh) + rw := k8sutil.NewResourceWatcher( + d.deps.Log, + d.deps.KubeCli.CoreV1().RESTClient(), + "secrets", + d.apiObject.GetNamespace(), + &v1.Secret{}, + cache.ResourceEventHandlerFuncs{ + // Note: For secrets we look at all of them because they do not have to be owned by this deployment. + AddFunc: func(obj interface{}) { + if _, ok := getSecret(obj); ok { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if _, ok := getSecret(newObj); ok { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if _, ok := getSecret(obj); ok { + d.triggerInspection() + } + }, + }) + + rw.Run(stopCh) } // listenForServiceEvents keep listening for changes in Service's until the given channel is closed. func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) { - source := cache.NewListWatchFromClient( - d.deps.KubeCli.CoreV1().RESTClient(), - "services", - d.apiObject.GetNamespace(), - fields.Everything()) - getService := func(obj interface{}) (*v1.Service, bool) { service, ok := obj.(*v1.Service) if !ok { @@ -176,23 +171,29 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) { return service, true } - _, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if s, ok := getService(obj); ok && d.isOwnerOf(s) { - d.triggerInspection() - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if s, ok := getService(newObj); ok && d.isOwnerOf(s) { - d.triggerInspection() - } - }, - DeleteFunc: func(obj interface{}) { - if s, ok := getService(obj); ok && d.isOwnerOf(s) { - d.triggerInspection() - } - }, - }, cache.Indexers{}) - - informer.Run(stopCh) + rw := k8sutil.NewResourceWatcher( + d.deps.Log, + d.deps.KubeCli.CoreV1().RESTClient(), + "services", + d.apiObject.GetNamespace(), + &v1.Service{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if s, ok := getService(obj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if s, ok := getService(newObj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if s, ok := getService(obj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + }) + + rw.Run(stopCh) } diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index 940baf3a5..2c9105d81 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -26,13 +26,13 @@ import ( "fmt" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/fields" kwatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/deployment" "github.com/arangodb/kube-arangodb/pkg/metrics" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) var ( @@ -46,20 +46,20 @@ var ( // run the deployments part of the operator. // This registers a listener and waits until the process stops. func (o *Operator) runDeployments(stop <-chan struct{}) { - source := cache.NewListWatchFromClient( + rw := k8sutil.NewResourceWatcher( + o.log, o.Dependencies.CRCli.DatabaseV1alpha().RESTClient(), api.ArangoDeploymentResourcePlural, o.Config.Namespace, - fields.Everything()) - - _, informer := cache.NewIndexerInformer(source, &api.ArangoDeployment{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: o.onAddArangoDeployment, - UpdateFunc: o.onUpdateArangoDeployment, - DeleteFunc: o.onDeleteArangoDeployment, - }, cache.Indexers{}) + &api.ArangoDeployment{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: o.onAddArangoDeployment, + UpdateFunc: o.onUpdateArangoDeployment, + DeleteFunc: o.onDeleteArangoDeployment, + }) o.Dependencies.DeploymentProbe.SetReady() - informer.Run(stop) + rw.Run(stop) } // onAddArangoDeployment deployment addition callback diff --git a/pkg/operator/operator_local_storage.go b/pkg/operator/operator_local_storage.go index db6e823e7..e71a10369 100644 --- a/pkg/operator/operator_local_storage.go +++ b/pkg/operator/operator_local_storage.go @@ -26,13 +26,13 @@ import ( "fmt" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/fields" kwatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" "github.com/arangodb/kube-arangodb/pkg/metrics" "github.com/arangodb/kube-arangodb/pkg/storage" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) var ( @@ -46,20 +46,20 @@ var ( // run the local storages part of the operator. // This registers a listener and waits until the process stops. func (o *Operator) runLocalStorages(stop <-chan struct{}) { - source := cache.NewListWatchFromClient( + rw := k8sutil.NewResourceWatcher( + o.log, o.Dependencies.CRCli.StorageV1alpha().RESTClient(), api.ArangoLocalStorageResourcePlural, "", //o.Config.Namespace, - fields.Everything()) - - _, informer := cache.NewIndexerInformer(source, &api.ArangoLocalStorage{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: o.onAddArangoLocalStorage, - UpdateFunc: o.onUpdateArangoLocalStorage, - DeleteFunc: o.onDeleteArangoLocalStorage, - }, cache.Indexers{}) + &api.ArangoLocalStorage{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: o.onAddArangoLocalStorage, + UpdateFunc: o.onUpdateArangoLocalStorage, + DeleteFunc: o.onDeleteArangoLocalStorage, + }) o.Dependencies.StorageProbe.SetReady() - informer.Run(stop) + rw.Run(stop) } // onAddArangoLocalStorage local storage addition callback diff --git a/pkg/storage/pv_informer.go b/pkg/storage/pv_informer.go index f169e7d43..1a18bd6f9 100644 --- a/pkg/storage/pv_informer.go +++ b/pkg/storage/pv_informer.go @@ -24,18 +24,13 @@ package storage import ( "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/tools/cache" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) // listenForPvEvents keep listening for changes in PV's until the given channel is closed. func (ls *LocalStorage) listenForPvEvents() { - source := cache.NewListWatchFromClient( - ls.deps.KubeCli.CoreV1().RESTClient(), - "persistentvolumes", - "", //ls.apiObject.GetNamespace(), - fields.Everything()) - getPv := func(obj interface{}) (*v1.PersistentVolume, bool) { pv, ok := obj.(*v1.PersistentVolume) if !ok { @@ -49,22 +44,28 @@ func (ls *LocalStorage) listenForPvEvents() { return pv, true } - _, informer := cache.NewIndexerInformer(source, &v1.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - // Ignore - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if pv, ok := getPv(newObj); ok { - ls.send(&localStorageEvent{ - Type: eventPVUpdated, - PersistentVolume: pv, - }) - } - }, - DeleteFunc: func(obj interface{}) { - // Ignore - }, - }, cache.Indexers{}) + rw := k8sutil.NewResourceWatcher( + ls.deps.Log, + ls.deps.KubeCli.CoreV1().RESTClient(), + "persistentvolumes", + "", //ls.apiObject.GetNamespace(), + &v1.PersistentVolume{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + // Ignore + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if pv, ok := getPv(newObj); ok { + ls.send(&localStorageEvent{ + Type: eventPVUpdated, + PersistentVolume: pv, + }) + } + }, + DeleteFunc: func(obj interface{}) { + // Ignore + }, + }) - informer.Run(ls.stopCh) + rw.Run(ls.stopCh) } diff --git a/pkg/storage/pvc_informer.go b/pkg/storage/pvc_informer.go index de2db650d..2cadb0f0a 100644 --- a/pkg/storage/pvc_informer.go +++ b/pkg/storage/pvc_informer.go @@ -24,18 +24,13 @@ package storage import ( "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/tools/cache" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) // listenForPvcEvents keep listening for changes in PVC's until the given channel is closed. func (ls *LocalStorage) listenForPvcEvents() { - source := cache.NewListWatchFromClient( - ls.deps.KubeCli.CoreV1().RESTClient(), - "persistentvolumeclaims", - "", //ls.apiObject.GetNamespace(), - fields.Everything()) - getPvc := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { @@ -49,27 +44,33 @@ func (ls *LocalStorage) listenForPvcEvents() { return pvc, true } - _, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if pvc, ok := getPvc(obj); ok { - ls.send(&localStorageEvent{ - Type: eventPVCAdded, - PersistentVolumeClaim: pvc, - }) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if pvc, ok := getPvc(newObj); ok { - ls.send(&localStorageEvent{ - Type: eventPVCUpdated, - PersistentVolumeClaim: pvc, - }) - } - }, - DeleteFunc: func(obj interface{}) { - // Ignore - }, - }, cache.Indexers{}) + rw := k8sutil.NewResourceWatcher( + ls.deps.Log, + ls.deps.KubeCli.CoreV1().RESTClient(), + "persistentvolumeclaims", + "", //ls.apiObject.GetNamespace(), + &v1.PersistentVolumeClaim{}, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pvc, ok := getPvc(obj); ok { + ls.send(&localStorageEvent{ + Type: eventPVCAdded, + PersistentVolumeClaim: pvc, + }) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if pvc, ok := getPvc(newObj); ok { + ls.send(&localStorageEvent{ + Type: eventPVCUpdated, + PersistentVolumeClaim: pvc, + }) + } + }, + DeleteFunc: func(obj interface{}) { + // Ignore + }, + }) - informer.Run(ls.stopCh) + rw.Run(ls.stopCh) }